diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 7e0c1630a76e4..3ef34c90972f4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -88,6 +88,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; @@ -714,7 +715,8 @@ public static final IndexShard newIndexShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, nodeId, null, - false + false, + BlobStoreTestUtil.mockClusterService() ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java new file mode 100644 index 0000000000000..a9b3e72353a71 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java @@ -0,0 +1,295 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.junit.annotations.TestLogging; + +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class DocrepToRemoteDualReplicationIT extends MigrationBaseTestCase { + public String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; + public String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; + public String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; + + /* + Scenario: + - Starts 2 docrep backed and 1 remote backed data node + - Exclude remote backed node from shard assignment + - Index some docs + - Move primary copy from docrep to remote through _cluster/reroute + - Index some more docs + - Assert primary-replica consistency + */ + public void testRemotePrimaryDocRepReplica() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startClusterManagerOnlyNode(); + initDocRepToRemoteMigration(); + + logger.info("---> Starting 2 docrep data nodes"); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); + + logger.info("---> Starting 1 remote enabled data node"); + addRemote = true; + String remoteNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME).get().repositories().size(), 2); + + logger.info("---> Excluding remote node from shard assignment"); + Settings excludeRemoteNode = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), remoteNodeName) + .build(); + createIndex(REMOTE_PRI_DOCREP_REP, excludeRemoteNode); + ensureGreen(REMOTE_PRI_DOCREP_REP); + + int initialBatch = randomIntBetween(1, 1000); + logger.info("---> Indexing {} docs", initialBatch); + SyncIndexingService indexingService = new SyncIndexingService(REMOTE_PRI_DOCREP_REP, initialBatch); + indexingService.startIndexing(); + + Settings includeRemoteNode = Settings.builder() + .putNull(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey()) + .build(); + assertAcked(internalCluster().client().admin().indices().prepareUpdateSettings().setIndices(REMOTE_PRI_DOCREP_REP).setSettings(includeRemoteNode).get()); + ensureGreen(REMOTE_PRI_DOCREP_REP); + + String primaryShardHostingNode = primaryNodeName(REMOTE_PRI_DOCREP_REP); + logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); + assertAcked(internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(REMOTE_PRI_DOCREP_REP, 0, primaryShardHostingNode, remoteNodeName)).get()); + ensureGreen(REMOTE_PRI_DOCREP_REP); + + int secondBatch = randomIntBetween(1, 10); + logger.info("---> Indexing another {} docs", secondBatch); + indexBulk(REMOTE_PRI_DOCREP_REP, secondBatch); + // Defensive check to ensure that doc count in replica shard catches up to the primary copy + flush(REMOTE_PRI_DOCREP_REP); + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats(REMOTE_PRI_DOCREP_REP).setDocs(true).get().asMap(); + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + assertReplicaAndPrimaryConsistencySingle(shardStatsMap, initialBatch, secondBatch, nodes); + } + + /* + Scenario: + - Starts 1 docrep backed data node + - Creates an index with 0 replica + - Starts 1 remote backed data node + - Index some docs + - Move primary copy from docrep to remote through _cluster/reroute + - Starts another remote backed data node + - Expands index to 2 replicas. One replica copy lies in remote backed node and other in docrep backed node + - Index some more docs + - Assert primary-replica consistency + */ + public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startClusterManagerOnlyNode(); + initDocRepToRemoteMigration(); + + logger.info("---> Starting 1 docrep data nodes"); + String docrepNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); + + logger.info("---> Creating index with 0 replica"); + Settings excludeRemoteNode = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, excludeRemoteNode); + ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); + + logger.info("---> Starting 1 remote enabled data node"); + addRemote = true; + String remoteNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME).get().repositories().size(), 2); + + int firstBatch = randomIntBetween(1, 100); + logger.info("---> Indexing {} docs", firstBatch); + SyncIndexingService indexingService = new SyncIndexingService(REMOTE_PRI_DOCREP_REMOTE_REP, firstBatch); + indexingService.startIndexing(); + + String primaryShardHostingNode = primaryNodeName(REMOTE_PRI_DOCREP_REMOTE_REP); + logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); + assertAcked(internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(REMOTE_PRI_DOCREP_REMOTE_REP, 0, primaryShardHostingNode, remoteNodeName)).get()); + ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); + + logger.info("---> Starting another remote enabled node"); + internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + + logger.info("---> Expanding index to 2 replica copies"); + Settings twoReplicas = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .build(); + assertAcked(internalCluster().client().admin().indices().prepareUpdateSettings().setIndices(REMOTE_PRI_DOCREP_REMOTE_REP).setSettings(twoReplicas).get()); + ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); + + int secondBatch = randomIntBetween(1, 10); + logger.info("---> Indexing another {} docs", secondBatch); + indexBulk(REMOTE_PRI_DOCREP_REMOTE_REP, secondBatch); + // Defensive check to ensure that doc count in replica shard catches up to the primary copy + flush(REMOTE_PRI_DOCREP_REMOTE_REP); + + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP).setDocs(true).get().asMap(); + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + assertReplicaAndPrimaryConsistencyMultiCopy(shardStatsMap, firstBatch, secondBatch, nodes); + } + + public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startClusterManagerOnlyNode(); + initDocRepToRemoteMigration(); + + logger.info("---> Starting 1 docrep data nodes"); + String docrepNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); + + logger.info("---> Creating index with 0 replica"); + Settings excludeRemoteNode = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + createIndex(FAILOVER_REMOTE_TO_DOCREP, excludeRemoteNode); + ensureGreen(FAILOVER_REMOTE_TO_DOCREP); + + logger.info("---> Starting 1 remote enabled data node"); + addRemote = true; + String remoteNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME).get().repositories().size(), 2); + + int firstBatch = randomIntBetween(1, 100); + logger.info("---> Indexing {} docs", firstBatch); + SyncIndexingService indexingService = new SyncIndexingService(FAILOVER_REMOTE_TO_DOCREP, firstBatch); + indexingService.startIndexing(); + + String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_DOCREP); + logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); + assertAcked(internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_DOCREP, 0, primaryShardHostingNode, remoteNodeName)).get()); + ensureGreen(FAILOVER_REMOTE_TO_DOCREP); + + logger.info("---> Expanding index to 1 replica copy"); + Settings twoReplicas = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + assertAcked(internalCluster().client().admin().indices().prepareUpdateSettings().setIndices(FAILOVER_REMOTE_TO_DOCREP).setSettings(twoReplicas).get()); + ensureGreen(FAILOVER_REMOTE_TO_DOCREP); + + flush(FAILOVER_REMOTE_TO_DOCREP); + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + long initialPrimaryDocCount = 0; + for (ShardRouting shardRouting: shardStatsMap.keySet()) { + if (shardRouting.primary()) { + initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); + } + } + assertReplicaAndPrimaryConsistencySingle(shardStatsMap, firstBatch, 0, nodes); + + logger.info("---> Stop remote store enabled node"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName)); + ensureStableCluster(2); + ensureYellow(FAILOVER_REMOTE_TO_DOCREP); + + shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); + long primaryDocCountAfterFailover = 0; + for (ShardRouting shardRouting: shardStatsMap.keySet()) { + if (shardRouting.primary()) { + assertFalse(shardRouting.isAssignedToRemoteStoreNode()); + primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); + } + } + assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover); + + logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs"); + int secondBatch = randomIntBetween(1, 10); + logger.info("---> Indexing {} more docs", firstBatch); + indexingService = new SyncIndexingService(FAILOVER_REMOTE_TO_DOCREP, secondBatch); + indexingService.startIndexing(); + flush(FAILOVER_REMOTE_TO_DOCREP); + + shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); + assertEquals(1, shardStatsMap.size()); + shardStatsMap.forEach( + (shardRouting, shardStats) -> { + assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); + } + ); + } + + private void assertReplicaAndPrimaryConsistencyMultiCopy(Map shardStatsMap, int firstBatch, int secondBatch, DiscoveryNodes nodes) throws Exception { + assertBusy( + () -> { + for (ShardRouting shardRouting : shardStatsMap.keySet()) { + CommonStats shardStats = shardStatsMap.get(shardRouting).getStats(); + if (shardRouting.primary()) { + assertEquals(firstBatch + secondBatch, shardStats.getDocs().getCount()); + assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + } else { + boolean remoteNode = nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode(); + assertEquals("Mismatched doc count. Is this on remote node ? " + remoteNode, firstBatch + secondBatch, shardStats.getDocs().getCount()); + RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); + if (remoteNode) { + assertTrue(remoteSegmentStats.getDownloadBytesStarted() > 0); + assertTrue(remoteSegmentStats.getTotalDownloadTime() > 0); + } else { + assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(0, remoteSegmentStats.getTotalUploadTime()); + } + } + } + } + ); + } + + private void assertReplicaAndPrimaryConsistencySingle(Map shardStatsMap, int initialBatch, int secondBatch, DiscoveryNodes nodes) throws Exception { + assertBusy( + () -> { + long primaryDocCount = 0, replicaDocCount = 0; + for (ShardRouting shardRouting : shardStatsMap.keySet()) { + CommonStats shardStats = shardStatsMap.get(shardRouting).getStats(); + if (shardRouting.primary()) { + primaryDocCount = shardStats.getDocs().getCount(); + assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + } else { + replicaDocCount = shardStats.getDocs().getCount(); + assertFalse(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); + assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); + assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); + } + } + assertTrue(replicaDocCount > 0); + assertEquals(replicaDocCount, initialBatch + secondBatch); + assertEquals(primaryDocCount, replicaDocCount); + } + ); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 88d6f6897ee68..62b43b48c73c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -8,12 +8,24 @@ package org.opensearch.remotemigration; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; public class MigrationBaseTestCase extends OpenSearchIntegTestCase { @@ -22,9 +34,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; - boolean addRemote = false; + private final List documentKeys = List.of( + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5) + ); + protected Settings nodeSettings(int nodeOrdinal) { if (segmentRepoPath == null || translogRepoPath == null) { segmentRepoPath = randomRepoPath().toAbsolutePath(); @@ -47,4 +66,103 @@ protected Settings nodeSettings(int nodeOrdinal) { protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); } + + public void initDocRepToRemoteMigration() { + assertTrue(internalCluster().client().admin().cluster().prepareUpdateSettings().setPersistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + ).get().isAcknowledged()); + } + + public BulkResponse indexBulk(String indexName, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + final IndexRequest request = client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) + .request(); + bulkRequest.add(request); + } + return client().bulk(bulkRequest).actionGet(); + } + + public class AsyncIndexingService { + private AtomicBoolean finished = new AtomicBoolean(); + private AtomicInteger numAutoGenDocs = new AtomicInteger(); + private Thread indexingThread; + private String indexName; + + AsyncIndexingService(String indexName) { + this(indexName, Integer.MAX_VALUE); + } + + AsyncIndexingService(String indexName, int maxDocs) { + indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < maxDocs) { + IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex(indexName).setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + logger.info("Indexed {} docs here", numAutoGenDocs.get()); + } + }); + } + + public void stopIndexing() throws InterruptedException { + finished.set(true); + indexingThread.join(); + } + + public int totalIndexedDocs() { + return numAutoGenDocs.get(); + } + + public void startIndexing() { + indexingThread.start(); + } + + public Thread getIndexingThread() { + return indexingThread; + } + } + + public class SyncIndexingService { + private int maxDocs; + private int currentIndexedDocs; + private boolean forceStop; + private String indexName; + + SyncIndexingService(String indexName) { + this(indexName, Integer.MAX_VALUE); + } + + SyncIndexingService(String indexName, int maxDocs) { + this.indexName = indexName; + this.maxDocs = maxDocs; + this.forceStop = false; + } + + public void forceStopIndexing() throws InterruptedException { + this.forceStop = true; + } + + public int getCurrentIndexedDocs() { + return currentIndexedDocs; + } + + public void startIndexing() { + while (currentIndexedDocs < maxDocs && forceStop == false) { + IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex(indexName).setSource("auto", true).get(); + currentIndexedDocs += 1; + logger.info("Indexed {} docs here", currentIndexedDocs); + } + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 3fbd628a1ad74..f2bc123dc0f15 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -10,13 +10,10 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; @@ -28,8 +25,6 @@ import org.opensearch.test.junit.annotations.TestLogging; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -61,20 +56,8 @@ public void testMixedModeRelocation() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 100) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - logger.info("Indexed {} docs here", numAutoGenDocs.get()); - } - }); - indexingThread.start(); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test", 100); + asyncIndexingService.startIndexing(); refresh("test"); @@ -126,16 +109,15 @@ public void testMixedModeRelocation() throws Exception { .execute() .actionGet(); } - finished.set(true); - indexingThread.join(); + asyncIndexingService.stopIndexing(); refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), asyncIndexingService.totalIndexedDocs()); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) .get(), - numAutoGenDocs.get() + asyncIndexingService.totalIndexedDocs() ); } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index a7a13afd2597c..612d169f5b779 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -442,7 +442,7 @@ protected long primaryOperationSize(BulkShardRequest request) { @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) { return ReplicationMode.PRIMARY_TERM_VALIDATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index 189bc82348a0c..07e32a2394b53 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -9,6 +9,7 @@ package org.opensearch.action.support.replication; import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.core.action.ActionListener; @@ -60,16 +61,21 @@ protected void performOnReplicaProxy( @Override ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) { - // If the current routing is the primary, then it does not need to be replicated if (shardRouting.isSameAllocation(primaryRouting)) { return ReplicationMode.NO_REPLICATION; } - + // Perform full replication during primary failover if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) { return ReplicationMode.FULL_REPLICATION; } - + /* + Perform full replication if replica is hosted on a non-remote node. + Only applicable during remote migration + */ + if (shardRouting.isAssignedToRemoteStoreNode() == false) { + return ReplicationMode.FULL_REPLICATION; + } return replicationModeOverride; } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 95f998e2d89c2..1ebee535a8d17 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -356,7 +356,7 @@ public void performOn( * @return the overridden replication mode. */ public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) { return ReplicationMode.NO_REPLICATION; } return ReplicationMode.FULL_REPLICATION; @@ -642,7 +642,7 @@ public void handleException(TransportException exp) { primaryRequest.getPrimaryTerm(), initialRetryBackoffBound, retryTimeout, - indexShard.isRemoteTranslogEnabled() + indexShard.isRemoteTranslogEnabled() || indexShard.indexSettings().isRemoteNode() ? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy) : new FanoutReplicationProxy<>(replicasProxy) ).execute(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 45de045a8fc69..acca57669a767 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -75,6 +75,7 @@ public class ShardRouting implements Writeable, ToXContentObject { private final long expectedShardSize; @Nullable private final ShardRouting targetRelocatingShard; + private boolean assignedToRemoteStoreNode; /** * A constructor to internally create shard routing instances, note, the internal flag should only be set to true @@ -878,4 +879,12 @@ public boolean unassignedReasonIndexCreated() { } return false; } + + public boolean isAssignedToRemoteStoreNode() { + return assignedToRemoteStoreNode; + } + + public void setAssignedToRemoteStoreNode(boolean assignedToRemoteStoreNode) { + this.assignedToRemoteStoreNode = assignedToRemoteStoreNode; + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 3ae404e76e089..7244932fbcc03 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -556,7 +556,8 @@ public synchronized IndexShard createShard( clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), recoverySettings, - seedRemote + seedRemote, + clusterService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index b4c33d781af86..86079e49e0be9 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,7 +8,11 @@ package org.opensearch.index.remote; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import java.util.Arrays; import java.util.HashMap; @@ -16,6 +20,9 @@ import java.util.Map; import java.util.function.Function; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; + /** * Utils for remote store * @@ -101,4 +108,17 @@ public static void verifyNoMultipleWriters(List mdFiles, Function e.getValue().tracked).allMatch(e -> e.getValue().replicated) - : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION"; + : "In absence of remote translog store and no-ongoing remote migration, all tracked shards must have replication mode as LOGICAL_REPLICATION"; return new ReplicationGroup( routingTable, @@ -1248,7 +1254,8 @@ private void createReplicationLagTimers() { if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false && isPrimaryRelocation(allocationId) == false - && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) { + && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) + && (ongoingEngineMigration && routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer()); logger.trace( () -> new ParameterizedMessage( @@ -1446,6 +1453,7 @@ public synchronized void updateFromClusterManager( + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; + final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId).isAssignedToRemoteStoreNode(); checkpoints.put( initializingId, new CheckpointState( @@ -1453,7 +1461,7 @@ public synchronized void updateFromClusterManager( globalCheckpoint, inSync, inSync, - isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) + isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId, assignedToRemoteStoreNode) ) ); } @@ -1465,6 +1473,7 @@ public synchronized void updateFromClusterManager( for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; + final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId).isAssignedToRemoteStoreNode(); checkpoints.put( initializingId, new CheckpointState( @@ -1472,13 +1481,16 @@ public synchronized void updateFromClusterManager( globalCheckpoint, false, false, - isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) + isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId, assignedToRemoteStoreNode) ) ); } for (String inSyncId : inSyncAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; + // Handling cases where node leaves the cluster but the insyncAids are not yet updated + final ShardRouting shardRouting = routingTable.getByAllocationId(inSyncId); + final boolean assignedToRemoteStoreNode = shardRouting != null && shardRouting.isAssignedToRemoteStoreNode() == true; checkpoints.put( inSyncId, new CheckpointState( @@ -1486,7 +1498,7 @@ public synchronized void updateFromClusterManager( globalCheckpoint, true, true, - isReplicated(inSyncId, primaryAllocationId, primaryTargetAllocationId) + isReplicated(inSyncId, primaryAllocationId, primaryTargetAllocationId, assignedToRemoteStoreNode) ) ); } @@ -1511,12 +1523,13 @@ public synchronized void updateFromClusterManager( * @param primaryTargetAllocationId primary target allocation id * @return the replication mode. */ - private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId) { - // If remote translog is enabled, then returns replication mode checking current allocation id against the - // primary and primary target allocation id. - // If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation - // id. - if (indexSettings().isRemoteTranslogStoreEnabled()) { + private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId, boolean assignedToRemoteStoreNode) { + /* + - If remote translog is enabled, then returns replication mode checking current allocation id against the primary and primary target allocation id. + - If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation primary and primary target allocation id. + - During an ongoing remote migration, the above-mentioned checks are considered when the shard is assigned to a remote store backed node + */ + if (indexSettings().isRemoteTranslogStoreEnabled() || (ongoingEngineMigration == true && assignedToRemoteStoreNode == true)) { return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId)); } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 5fa0a1a6459e7..b5fa2bd551e02 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -39,10 +39,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardCount; -import org.opensearch.action.support.replication.ReplicationRequest; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.ReplicationTask; -import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.action.support.replication.*; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -198,6 +195,20 @@ protected void shardOperationOnReplica(Request request, IndexShard replica, Acti }); } + @Override + public ReplicationMode getReplicationMode(IndexShard indexShard) { + /* + Unblock PRRL publication on both docrep and remote shard copies + during remote store migration. This is done deliberately to ensure + data consistency on remote to docrep shard copy failover during the + migration process. + */ + if (indexShard.ongoingEngineMigration()) { + return ReplicationMode.FULL_REPLICATION; + } + return super.getReplicationMode(indexShard); + } + /** * Request for retention lease bground sync action * diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index ca3c7e1d49700..4c449ed7338d5 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -40,10 +40,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.WriteResponse; -import org.opensearch.action.support.replication.ReplicatedWriteRequest; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.ReplicationTask; -import org.opensearch.action.support.replication.TransportWriteAction; +import org.opensearch.action.support.replication.*; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.service.ClusterService; @@ -209,6 +206,15 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep }); } + @Override + public ReplicationMode getReplicationMode(IndexShard indexShard) { + // Unblock PRRL publication during remote store migration + if (indexShard.ongoingEngineMigration()) { + return ReplicationMode.FULL_REPLICATION; + } + return super.getReplicationMode(indexShard); + } + @Override public ClusterBlockLevel indexBlockLevel() { return null; diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 675d60ec2b63d..1ed63796655df 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -43,7 +43,14 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() - && !shard.indexSettings.isSegRepWithRemoteEnabled()) { + && !shard.indexSettings.isSegRepWithRemoteEnabled() + /* + During remote store migration, the isSegRepWithRemoteEnabled criteria would return false + since we do not alter the remote store based index settings at that stage. Explicitly + blocking checkpoint publication from this refresh listener since it ends up interfering + with the RemoteStoreRefreshListener invocation + */ + && !shard.ongoingEngineMigration()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } return true; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 28299c0275b23..ac389f0adfa9a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -73,6 +73,7 @@ import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -150,6 +151,7 @@ import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -348,6 +350,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreFileDownloader fileDownloader; private final RecoverySettings recoverySettings; private final RemoteMigrationShardState remoteMigrationShardState; + private final ClusterService clusterService; public IndexShard( final ShardRouting shardRouting, @@ -377,7 +380,8 @@ public IndexShard( final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, @Nullable final RecoverySettings recoverySettings, - boolean seedRemote + boolean seedRemote, + ClusterService clusterService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -478,6 +482,7 @@ public boolean shouldCache(Query query) { this.remoteMigrationShardState = (indexSettings.isRemoteNode() && indexSettings.isRemoteStoreEnabled() == false) ? new RemoteMigrationShardState(seedRemote) : null; + this.clusterService = clusterService; } public ThreadPool getThreadPool() { @@ -632,6 +637,7 @@ public void updateShardState( if (newRouting.primary()) { replicationTracker.updateFromClusterManager(applyingClusterStateVersion, inSyncAllocationIds, routingTable); + replicationTracker.setOngoingEngineMigration(ongoingEngineMigration()); } if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { @@ -3318,7 +3324,7 @@ public void syncRetentionLeases() { ) ); } else { - logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); + logger.info("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); retentionLeaseSyncer.backgroundSync( shardId, shardRouting.allocationId().getId(), @@ -3479,7 +3485,7 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) - || indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint [" + || indexSettings.isRemoteTranslogStoreEnabled() || indexSettings.isRemoteNode() : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "that is higher than its local checkpoint [" @@ -3518,6 +3524,7 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(allocati + "]"; synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex + replicationTracker.setOngoingEngineMigration(ongoingEngineMigration()); } postActivatePrimaryMode(); } @@ -4594,6 +4601,9 @@ public final boolean isSearchIdleSupported() { if (isRemoteTranslogEnabled() || indexSettings.isRemoteNode()) { return false; } + if (ongoingEngineMigration()) { + return false; + } return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0; } @@ -5199,6 +5209,10 @@ public AsyncIOProcessor getTranslogSyncProcessor() { return translogSyncProcessor; } + public boolean ongoingEngineMigration() { + return RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true; + } + static class RemoteMigrationShardState { // Set to true for any primary shard started on remote backed node relocating from docrep node diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7bb80b736693f..483bf1587c09e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -340,7 +340,7 @@ private void onSuccessfulSegmentsSync( resetBackOffDelayIterator(); // Set the minimum sequence number for keeping translog indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - // Publishing the new checkpoint which is used for remote store + segrep indexes + // Publishing the new checkpoint which is used for remote store + segrep indexess checkpointPublisher.publish(indexShard, checkpoint); logger.debug("onSuccessfulSegmentsSync lastRefreshedCheckpoint={} checkpoint={}", lastRefreshedCheckpoint, checkpoint); } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 7fb8b172ae352..ce16226e97602 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -67,6 +67,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -708,6 +709,7 @@ private void updateShard( primaryTerm = indexMetadata.primaryTerm(shard.shardId().id()); final Set inSyncIds = indexMetadata.inSyncAllocationIds(shard.shardId().id()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); + updateShardRoutingWithNode(indexShardRoutingTable, nodes); shard.updateShardState( shardRouting, primaryTerm, @@ -1096,4 +1098,14 @@ enum IndexRemovalReason { REOPENED, } } + + private void updateShardRoutingWithNode(IndexShardRoutingTable indexShardRoutingTable, DiscoveryNodes nodes) { + for (ShardRouting routing : indexShardRoutingTable.assignedShards()) { + if (routing.relocating()) { + final ShardRouting targetRouting = routing.getTargetRelocatingShard(); + targetRouting.setAssignedToRemoteStoreNode(nodes.get(targetRouting.currentNodeId()).isRemoteStoreNode()); + } + routing.setAssignedToRemoteStoreNode(nodes.get(routing.currentNodeId()).isRemoteStoreNode()); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java index 0ccb1ac2133cf..eff43ea6ca407 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -24,7 +24,7 @@ public static RecoverySourceHandler create( RecoverySettings recoverySettings ) { boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false - && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()); + && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()) && request.targetNode().isRemoteStoreNode(); if (isReplicaRecoveryWithRemoteTranslog) { return new RemoteStorePeerRecoverySourceHandler( shard, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 821ae42e31881..f5cf18b20c206 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -98,7 +98,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); @@ -199,6 +199,11 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh Objects.requireNonNull(replica); ActionListener.completeWith(listener, () -> { logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); + // Ignore replica operation if there is an ongoing remote store migration and the replica copy is assigned to a docrep enabled node + if (replica.ongoingEngineMigration() == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) { + logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp."); + return new ReplicaResult(); + } if (request.getCheckpoint().getShardId().equals(replica.shardId())) { replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bd7a9f1db9236..46fc57a0fb8a2 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -64,6 +64,7 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.PathUtils; import org.opensearch.common.lease.Releasable; @@ -680,6 +681,7 @@ protected IndexShard newShard( } return new InternalTranslogFactory(); }; + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(); indexShard = new IndexShard( routing, indexSettings, @@ -708,7 +710,8 @@ protected IndexShard newShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", DefaultRecoverySettings.INSTANCE, - false + false, + clusterService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {