Skip to content

Commit

Permalink
Ensure latest replication checkpoint post failover has correct operat…
Browse files Browse the repository at this point in the history
…ional primary term (opensearch-project#11990) (opensearch-project#12083)

* Force update operation primary term in replication checkout post failover


(cherry picked from commit c55af66)

Signed-off-by: bansvaru <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 08bbddc commit e8a9944
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.test.InternalTestCluster;
Expand Down Expand Up @@ -135,6 +140,54 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, boolea
restoreAndVerify(shardCount, 0, indexStats);
}

public void testMultipleWriters() throws Exception {
prepareCluster(1, 2, INDEX_NAME, 1, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, true, INDEX_NAME);
assertEquals(2, getNumShards(INDEX_NAME).totalNumShards);

// ensure replica has latest checkpoint
flushAndRefresh(INDEX_NAME);
flushAndRefresh(INDEX_NAME);

Index indexObj = clusterService().state().metadata().indices().get(INDEX_NAME).getIndex();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName(INDEX_NAME));
IndexService indexService = indicesService.indexService(indexObj);
IndexShard indexShard = indexService.getShard(0);
RemoteSegmentMetadata remoteSegmentMetadataBeforeFailover = indexShard.getRemoteDirectory().readLatestMetadataFile();

// ensure all segments synced to replica
assertBusy(
() -> assertHitCount(
client(primaryNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(),
indexStats.get(TOTAL_OPERATIONS)
),
30,
TimeUnit.SECONDS
);
assertBusy(
() -> assertHitCount(
client(replicaNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(),
indexStats.get(TOTAL_OPERATIONS)
),
30,
TimeUnit.SECONDS
);

String newPrimaryNodeName = replicaNodeName(INDEX_NAME);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureYellow(INDEX_NAME);

indicesService = internalCluster().getInstance(IndicesService.class, newPrimaryNodeName);
indexService = indicesService.indexService(indexObj);
indexShard = indexService.getShard(0);
IndexShard finalIndexShard = indexShard;
assertBusy(() -> assertTrue(finalIndexShard.isStartedPrimary() && finalIndexShard.isPrimaryMode()));
assertEquals(
finalIndexShard.getLatestSegmentInfosAndCheckpoint().v2().getPrimaryTerm(),
remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1
);
}

/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,8 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()
&& latestReplicationCheckpoint.getPrimaryTerm() == getOperationPrimaryTerm()) {
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
Expand Down Expand Up @@ -2023,7 +2024,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
/*
ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003
*/
private RemoteSegmentStoreDirectory getRemoteDirectory() {
public RemoteSegmentStoreDirectory getRemoteDirectory() {
assert indexSettings.isRemoteStoreEnabled();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -209,6 +210,16 @@ private boolean syncSegments() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"primaryTerm mismatch during segments upload to remote store [%s] != [%s]",
checkpoint.getPrimaryTerm(),
indexShard.getOperationPrimaryTerm()
)
);
}
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,33 @@ public void testTrackerData() throws Exception {
assertBusy(() -> assertNoLag(tracker));
}

/**
* Tests segments upload fails with replication checkpoint and replication tracker primary term mismatch
*/
public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
int totalAttempt = 1;
int checkpointPublishSucceedOnAttempt = 0;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(totalAttempt);

// success latch should change as we would be failed primary term latest validation.
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
totalAttempt,
refreshCountLatch,
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch,
false
);

assertBusy(() -> assertEquals(1, tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId()).getTotalUploadsFailed()));
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(1, successLatch.getCount()));
assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount()));
}

private void assertNoLag(RemoteSegmentTransferTracker tracker) {
assertEquals(0, tracker.getRefreshSeqNoLag());
assertEquals(0, tracker.getBytesLag());
Expand Down Expand Up @@ -460,6 +487,24 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch
) throws IOException {
return mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
succeedCheckpointPublishOnAttempt,
reachedCheckpointPublishLatch,
true
);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -492,6 +537,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

// Mock indexShard.getOperationPrimaryTerm()
if (mockPrimaryTerm) {
when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm());
}
when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint());

// Mock indexShard.routingEntry().primary()
Expand Down

0 comments on commit e8a9944

Please sign in to comment.