From 7829c7118409c09105a9271a2c05f988f228f1d3 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 30 Jan 2024 19:04:27 +0530 Subject: [PATCH] Ensure latest replication checkpoint post failover has correct operational primary term (#11990) * Force update operation primary term in replication checkout post failover Signed-off-by: bansvaru Signed-off-by: Shivansh Arora --- .../remotestore/RemoteStoreRestoreIT.java | 53 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 5 +- .../shard/RemoteStoreRefreshListener.java | 11 ++++ .../RemoteStoreRefreshListenerTests.java | 48 +++++++++++++++++ 4 files changed, 115 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 72296a1c01a24..dfa5528eafcf2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -17,6 +17,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.indices.IndicesService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.test.InternalTestCluster; @@ -135,6 +140,54 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, boolea restoreAndVerify(shardCount, 0, indexStats); } + public void testMultipleWriters() throws Exception { + prepareCluster(1, 2, INDEX_NAME, 1, 1); + Map indexStats = indexData(randomIntBetween(2, 5), true, true, INDEX_NAME); + assertEquals(2, getNumShards(INDEX_NAME).totalNumShards); + + // ensure replica has latest checkpoint + flushAndRefresh(INDEX_NAME); + flushAndRefresh(INDEX_NAME); + + Index indexObj = clusterService().state().metadata().indices().get(INDEX_NAME).getIndex(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName(INDEX_NAME)); + IndexService indexService = indicesService.indexService(indexObj); + IndexShard indexShard = indexService.getShard(0); + RemoteSegmentMetadata remoteSegmentMetadataBeforeFailover = indexShard.getRemoteDirectory().readLatestMetadataFile(); + + // ensure all segments synced to replica + assertBusy( + () -> assertHitCount( + client(primaryNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(), + indexStats.get(TOTAL_OPERATIONS) + ), + 30, + TimeUnit.SECONDS + ); + assertBusy( + () -> assertHitCount( + client(replicaNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(), + indexStats.get(TOTAL_OPERATIONS) + ), + 30, + TimeUnit.SECONDS + ); + + String newPrimaryNodeName = replicaNodeName(INDEX_NAME); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureYellow(INDEX_NAME); + + indicesService = internalCluster().getInstance(IndicesService.class, newPrimaryNodeName); + indexService = indicesService.indexService(indexObj); + indexShard = indexService.getShard(0); + IndexShard finalIndexShard = indexShard; + assertBusy(() -> assertTrue(finalIndexShard.isStartedPrimary() && finalIndexShard.isPrimaryMode())); + assertEquals( + finalIndexShard.getLatestSegmentInfosAndCheckpoint().v2().getPrimaryTerm(), + remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1 + ); + } + /** * Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop. * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index cbb246219546b..5834eabfa9af0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1697,7 +1697,8 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th } final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint(); if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion() - && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) { + && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration() + && latestReplicationCheckpoint.getPrimaryTerm() == getOperationPrimaryTerm()) { return latestReplicationCheckpoint; } final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); @@ -2014,7 +2015,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO /* ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003 */ - private RemoteSegmentStoreDirectory getRemoteDirectory() { + public RemoteSegmentStoreDirectory getRemoteDirectory() { assert indexSettings.isRemoteStoreEnabled(); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d96a7e7c95ecf..e96a4bb19a960 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -209,6 +210,16 @@ private boolean syncSegments() { try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos); + if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "primaryTerm mismatch during segments upload to remote store [%s] != [%s]", + checkpoint.getPrimaryTerm(), + indexShard.getOperationPrimaryTerm() + ) + ); + } // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 555284e55a0fa..6567cb03f3dc6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -425,6 +425,33 @@ public void testTrackerData() throws Exception { assertBusy(() -> assertNoLag(tracker)); } + /** + * Tests segments upload fails with replication checkpoint and replication tracker primary term mismatch + */ + public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception { + int totalAttempt = 1; + int checkpointPublishSucceedOnAttempt = 0; + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(totalAttempt); + + // success latch should change as we would be failed primary term latest validation. + CountDownLatch successLatch = new CountDownLatch(1); + CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + totalAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch, + false + ); + + assertBusy(() -> assertEquals(1, tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId()).getTotalUploadsFailed())); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(1, successLatch.getCount())); + assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount())); + } + private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getRefreshSeqNoLag()); assertEquals(0, tracker.getBytesLag()); @@ -460,6 +487,24 @@ private Tuple mockIn CountDownLatch successLatch, int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + succeedCheckpointPublishOnAttempt, + reachedCheckpointPublishLatch, + true + ); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int succeedCheckpointPublishOnAttempt, + CountDownLatch reachedCheckpointPublishLatch, + boolean mockPrimaryTerm ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -500,6 +545,9 @@ private Tuple mockIn when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() + if (mockPrimaryTerm) { + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + } when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint()); // Mock indexShard.routingEntry().primary()