diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java new file mode 100644 index 0000000000000..4319772033a65 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationWithRemoteStoreSuiteIT.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) +public class SegmentReplicationWithRemoteStoreSuiteIT extends SegmentReplicationBaseIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreClusterSettings(REPOSITORY_NAME)).build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType("fs") + .setSettings(Settings.builder().put("location", randomRepoPath().toAbsolutePath())) + ); + createIndex(INDEX_NAME); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @Override + public Settings indexSettings() { + final Settings.Builder builder = Settings.builder() + .put(super.indexSettings()) + // reset shard & replica count to random values set by OpenSearchIntegTestCase. + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + return builder.build(); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + public void testBasicReplication() throws Exception { + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(); + ensureGreen(INDEX_NAME); + verifyStoreContent(); + } + + public void testDropRandomNodeDuringReplication() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().startClusterManagerOnlyNodes(1); + + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(); + + internalCluster().restartRandomDataNode(); + + ensureYellow(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); + internalCluster().startDataOnlyNode(); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testDeleteIndexWhileReplicating() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testFullRestartDuringReplication() throws Exception { + internalCluster().startNode(); + final int docCount = scaledRandomIntBetween(10, 50); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + internalCluster().fullRestart(); + ensureGreen(INDEX_NAME); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7a5f9608dace0..829012a65b991 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -158,14 +158,14 @@ public void startReplication(ActionListener listener) { logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); - cancellableThreads.checkForCancel(); - source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener)); checkpointInfoListener.whenComplete(checkpointInfo -> { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); - cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener); + cancellableThreads.execute( + () -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener) + ); }, listener::onFailure); getFilesListener.whenComplete(response -> { @@ -175,7 +175,6 @@ public void startReplication(ActionListener listener) { } private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff)); @@ -201,7 +200,6 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); // Handle empty SegmentInfos bytes for recovering replicas if (checkpointInfoResponse.getInfosBytes() == null) { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..8622e5944b165 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -14,14 +14,23 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.RemoteStoreReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.hamcrest.MatcherAssert; +import org.junit.Assert; import org.junit.Before; import java.io.IOException; @@ -36,6 +45,9 @@ import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests { @@ -79,6 +91,56 @@ public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); } + public void testCloseShardWhileGettingCheckpoint() throws Exception { + String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; + try ( + ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir()) + ) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + primary.refresh("Test"); + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + Runnable beforeCkpSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + Runnable beforeGetFilesSourceCall = () -> Assert.fail("Should not have been executed"); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); + startReplicationAndAssertCancellation(replica, primary, targetService); + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + Runnable beforeCkpSourceCall = () -> {}; + Runnable beforeGetFilesSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); + startReplicationAndAssertCancellation(replica, primary, targetService); + shards.removeReplica(replica); + closeShards(replica); + } + } + public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { try ( ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir()) @@ -351,3 +413,42 @@ private void assertSingleSegmentFile(IndexShard shard, String fileName) throws I assertEquals(segmentsFileNames.stream().findFirst().get(), fileName); } } + +class TestRSReplicationSource extends RemoteStoreReplicationSource { + + private final Thread beforeCheckpoint; + private final Thread beforeGetFiles; + + public TestRSReplicationSource(IndexShard indexShard, Runnable beforeCheckpoint, Runnable beforeGetFiles) { + super(indexShard); + this.beforeCheckpoint = new Thread(beforeCheckpoint); + this.beforeGetFiles = new Thread(beforeGetFiles); + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + this.beforeCheckpoint.start(); + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + this.beforeGetFiles.start(); + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + } + + @Override + public String getDescription() { + return "TestReplicationSource"; + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 807b4a9cd7482..7b16ebef2170c 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -26,7 +26,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; @@ -36,7 +35,6 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; -import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.SnapshotMatchers; @@ -44,8 +42,6 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; -import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; @@ -84,7 +80,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -675,58 +670,6 @@ public void testCloseShardDuringFinalize() throws Exception { } } - public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - - ActionListener listener; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - resolveCheckpointInfoResponseListener(listener, primary); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - // set the listener, we will only fail it once we cancel the source. - this.listener = listener; - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - } - - @Override - public void cancel() { - // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); - listener.onFailure(exception); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, primary, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - protected SegmentReplicationTargetService newTargetService(SegmentReplicationSourceFactory sourceFactory) { return new SegmentReplicationTargetService( threadPool, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index c394101697b47..353968db16e66 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -709,4 +709,56 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } } + public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointInfoResponseListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, primary, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index b29e25a0bff2c..c75f105c0759c 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -41,6 +41,6 @@ public abstract void getSegmentFiles( @Override public String getDescription() { - return "This is a test description"; + return "TestReplicationSource"; } }