diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f439e48ecab7..2b9a11dc55723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391) - Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707)) +- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760) ### Deprecated - Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java index 5f65d6647f26d..df2620b794686 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java @@ -14,6 +14,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.List; @@ -23,7 +24,7 @@ import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { +public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase { @Override protected Settings featureFlagSettings() { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java index f660695af9965..1341c02910d1e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java @@ -8,53 +8,49 @@ package org.opensearch.indices.replication; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.After; -import org.junit.Before; import java.nio.file.Path; import java.util.List; import java.util.Set; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; + @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { private static final String REPOSITORY_NAME = "test-remote-store-repo"; protected Path absolutePath; - private Boolean useRemoteStore; - - @Before - public void randomizeRemoteStoreEnabled() { - useRemoteStore = randomBoolean(); - } - @Override protected Settings nodeSettings(int nodeOrdinal) { - if (useRemoteStore) { - if (absolutePath == null) { - absolutePath = randomRepoPath().toAbsolutePath(); - } - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) - .build(); + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); } - return super.nodeSettings(nodeOrdinal); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); } @After public void teardown() { - if (useRemoteStore) { - clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); - } + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } @Override @@ -62,7 +58,7 @@ public Settings indexSettings() { return Settings.builder() .put(super.indexSettings()) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) .build(); } @@ -88,6 +84,128 @@ public void testReplication() throws Exception { waitForSearchableDocs(docCount, primary, replica); } + public void testRecoveryAfterDocsIndexed() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)) + .get(); + + ensureGreen(INDEX_NAME); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) + .get(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + internalCluster().restartNode(replica); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + } + + public void testStopPrimary_RestoreOnNewNode() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertDocCounts(docCount, primary); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(docCount, replica); + // stop the primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.RED, clusterHealthResponse.getStatus()); + }); + assertDocCounts(docCount, replica); + + String restoredPrimary = internalCluster().startDataOnlyNode(); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + ensureGreen(INDEX_NAME); + assertDocCounts(docCount, replica, restoredPrimary); + + for (int i = docCount; i < docCount * 2; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertBusy(() -> assertDocCounts(20, replica, restoredPrimary)); + } + + public void testFailoverToNewPrimaryWithPollingReplication() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + final String writer_replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // stop the primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + }); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + assertDocCounts(10, replica); + + for (int i = docCount; i < docCount * 2; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertBusy(() -> assertDocCounts(20, replica, writer_replica)); + } + public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { internalCluster().startClusterManagerOnlyNode(); final List nodes = internalCluster().startDataOnlyNodes(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java index 352332b962c92..e8d65e07c7dd9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java @@ -15,7 +15,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.remotestore.RemoteSnapshotIT; import org.opensearch.snapshots.SnapshotRestoreException; import org.opensearch.test.OpenSearchIntegTestCase; @@ -26,7 +26,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchReplicaRestoreIT extends AbstractSnapshotIntegTestCase { +public class SearchReplicaRestoreIT extends RemoteSnapshotIT { private static final String INDEX_NAME = "test-idx-1"; private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; @@ -40,49 +40,6 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); } - public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnDocRepWithSearchReplica() throws Exception { - bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - SnapshotRestoreException exception = expectThrows( - SnapshotRestoreException.class, - () -> restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ) - ); - assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.DOCUMENT, ReplicationType.DOCUMENT))); - } - - public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnSegRepWithSearchReplica() throws Exception { - bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ); - ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME); - internalCluster().startDataOnlyNode(); - ensureGreen(RESTORED_INDEX_NAME); - assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME)); - - SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT); - } - public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception { bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT); createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); @@ -140,27 +97,6 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT))); } - public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRepWithNoSearchReplica() throws Exception { - bootstrapIndexWithSearchReplicas(); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) - .build() - ); - ensureGreen(RESTORED_INDEX_NAME); - assertEquals(0, getNumberOfSearchReplicas(RESTORED_INDEX_NAME)); - - SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT); - } - private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException { startCluster(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java index fa836e2cc5784..f524f4d1298c1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -20,6 +20,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -31,7 +32,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase { +public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase { private static final String TEST_INDEX = "test_index"; @@ -55,35 +56,6 @@ public Settings indexSettings() { .build(); } - public void testCreateDocRepFails() { - Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); - - IllegalArgumentException illegalArgumentException = expectThrows( - IllegalArgumentException.class, - () -> createIndex(TEST_INDEX, settings) - ); - assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); - } - - public void testUpdateDocRepFails() { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .build(); - // create succeeds - createIndex(TEST_INDEX, settings); - - // update fails - IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> { - client().admin() - .indices() - .prepareUpdateSettings(TEST_INDEX) - .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) - .get(); - }); - assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); - } - public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException { int numSearchReplicas = 1; int numWriterReplicas = 1; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 232201d18ba13..b5b2b71f977fa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1096,14 +1096,9 @@ static Settings aggregateIndexSettings( private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) { if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) { if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0 - && ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) { + && Boolean.parseBoolean(builder.get(SETTING_REMOTE_STORE_ENABLED)) == false) { throw new IllegalArgumentException( - "To set " - + SETTING_NUMBER_OF_SEARCH_REPLICAS - + ", " - + INDEX_REPLICATION_TYPE_SETTING.getKey() - + " must be set to " - + ReplicationType.SEGMENT + "To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true" ); } builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 8c350d6b9cef5..a35af0e607c31 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -63,7 +63,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -77,8 +76,8 @@ import java.util.Set; import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext; -import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings; @@ -538,14 +537,12 @@ public ClusterState execute(ClusterState currentState) { private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) { final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings); if (updatedNumberOfSearchReplicas > 0) { - if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) { + if (Arrays.stream(indices) + .allMatch( + index -> currentState.metadata().index(index.getName()).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false) + ) == false) { throw new IllegalArgumentException( - "To set " - + SETTING_NUMBER_OF_SEARCH_REPLICAS - + ", " - + INDEX_REPLICATION_TYPE_SETTING.getKey() - + " must be set to " - + ReplicationType.SEGMENT + "To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index b4592659bb70f..d2c7c7ce71953 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -149,7 +149,10 @@ boolean validate(Metadata metadata) { "Shard [" + indexShardRoutingTable.shardId().id() + "] routing table has wrong number of replicas, expected [" + + "Replicas: " + indexMetadata.getNumberOfReplicas() + + "Search Replicas: " + + indexMetadata.getNumberOfSearchOnlyReplicas() + "], got [" + routingNumberOfReplicas + "]" @@ -514,6 +517,16 @@ public Builder initializeAsRemoteStoreRestore( ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) ); } + // if writers are red we do not want to re-recover search only shards if already assigned. + for (ShardRouting shardRouting : indexShardRoutingTable.searchOnlyReplicas()) { + if (shardRouting.unassigned()) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) + ); + } else { + indexShardRoutingBuilder.addShard(shardRouting); + } + } } else { // Primary is either active or initializing. Do not trigger restore. indexShardRoutingBuilder.addShard(indexShardRoutingTable.primaryShard()); @@ -575,13 +588,7 @@ private Builder initializeAsRestore( } for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned( - shardId, - false, - true, - PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled - unassignedInfo - ) + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) ); } shards.put(shardNumber, indexShardRoutingBuilder.build()); @@ -624,13 +631,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas } for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned( - shardId, - false, - true, - PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled - unassignedInfo - ) + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) ); } shards.put(shardNumber, indexShardRoutingBuilder.build()); @@ -665,7 +666,7 @@ public Builder addSearchReplica() { shardId, false, true, - PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled + EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null) ); shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index f25cb14f65eca..1530d14d4558d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -134,6 +134,23 @@ public class IndexShardRoutingTable extends AbstractDiffable assignedShards = newShardRoutingTable.assignedShards() .stream() .filter(s -> s.isRelocationTarget() == false) + .filter(s -> s.isSearchOnly() == false) // do not consider search only shards for in sync validation .collect(Collectors.toList()); assert assignedShards.size() <= maxActiveShards : "cannot have more assigned shards " + assignedShards diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 4bde1e282fe78..a60bb1850b440 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -190,6 +190,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries); } } else { + if (shardRouting.isSearchOnly()) { + // skip any shard type throttling for search replicas and rely only on concurrent incoming node recovery limits. + return allocation.decision(YES, NAME, "Do not throttle search replica recovery"); + } // Peer recovery assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER; diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 1e43827afeb47..57ade7fa10cd0 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -633,6 +633,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { */ final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false) .filter(ShardRouting::assignedToNode) + .filter(r -> r.isSearchOnly() == false) .anyMatch(shardRouting -> { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease == null) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index eb3999718ca5b..c587549bb2a55 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2534,12 +2534,18 @@ public void openEngineAndSkipTranslogRecoveryFromSnapshot() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT : "not a snapshot recovery [" + routingEntry() + "]"; + openEngineAndSkipTranslogRecovery(false); + } + + void openEngineAndSkipTranslogRecovery(boolean syncFromRemote) throws IOException { recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); maybeCheckIndex(); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); loadGlobalCheckpointToReplicationTracker(); - innerOpenEngineAndTranslog(replicationTracker, false); + innerOpenEngineAndTranslog(replicationTracker, syncFromRemote); + assert routingEntry().isSearchOnly() == false || translogStats().estimatedNumberOfOperations() == 0 + : "Translog is expected to be empty but holds " + translogStats().estimatedNumberOfOperations() + "Operations."; getEngine().translogManager().skipTranslogRecovery(); } @@ -2889,7 +2895,8 @@ public void recoverFromLocalShards( public void recoverFromStore(ActionListener listener) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists - assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert shardRouting.primary() || shardRouting.isSearchOnly() + : "recover from store only makes sense if the shard is a primary shard or an untracked search only replica"; assert shardRouting.initializing() : "can only start recovery on initializing shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); storeRecovery.recoverFromStore(this, listener); diff --git a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java index ccfaf50da1c6b..b2db48737ee3f 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java @@ -67,15 +67,17 @@ public ReplicationGroup( this.inSyncAllocationIds = inSyncAllocationIds; this.trackedAllocationIds = trackedAllocationIds; this.version = version; - this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); this.replicationTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (final ShardRouting shard : routingTable) { - // search only replicas never receive any replicated operations if (shard.unassigned() || shard.isSearchOnly()) { assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; skippedShards.add(shard); + if (shard.isSearchOnly()) { + assert shard.allocationId() == null || inSyncAllocationIds.contains(shard.allocationId().getId()) == false + : " Search replicas should not be part of the inSync id set"; + } } else { if (trackedAllocationIds.contains(shard.allocationId().getId())) { replicationTargets.add(shard); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 6933e4e161dd1..935f0f2eed1d1 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -544,7 +544,7 @@ private boolean canRecover(IndexShard indexShard) { // got closed on us, just ignore this recovery return false; } - if (indexShard.routingEntry().primary() == false) { + if (indexShard.routingEntry().primary() == false && indexShard.routingEntry().isSearchOnly() == false) { throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null); } return true; @@ -747,7 +747,15 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); indexShard.recoveryState().getIndex().setFileDetailsComplete(); } - indexShard.openEngineAndRecoverFromTranslog(); + if (indexShard.routingEntry().isSearchOnly() == false) { + indexShard.openEngineAndRecoverFromTranslog(); + } else { + // Opens the engine for pull based replica copies that are + // not primary eligible. This will skip any checkpoint tracking and ensure + // that the shards are sync'd with remote store before opening. + // + indexShard.openEngineAndSkipTranslogRecovery(true); + } if (indexShard.shouldSeedRemoteStore()) { indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId()); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java index 3d11193a07884..81055e01d915b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java @@ -19,32 +19,46 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.ValidationException; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.env.Environment; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.cluster.ClusterStateChanges; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; public class SearchOnlyReplicaTests extends OpenSearchSingleNodeTestCase { + public static final String TEST_RS_REPO = "test-rs-repo"; + public static final String INDEX_NAME = "test-index"; private ThreadPool threadPool; @Before @@ -70,7 +84,7 @@ protected Settings featureFlagSettings() { public void testCreateWithDefaultSearchReplicasSetting() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); ClusterState state = createIndexWithSettings(cluster, Settings.builder().build()); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(INDEX_NAME).getShards().get(0); assertEquals(1, indexShardRoutingTable.replicaShards().size()); assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(1, indexShardRoutingTable.writerReplicas().size()); @@ -91,53 +105,50 @@ public void testSearchReplicasValidationWithDocumentReplication() { ) ); assertEquals( - "To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", + "To set index.number_of_search_only_replicas, index.remote_store.enabled must be set to true", exception.getCause().getMessage() ); } - public void testUpdateSearchReplicaCount() { - final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + public void testUpdateSearchReplicaCount() throws ExecutionException, InterruptedException { + Settings settings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + createIndex(INDEX_NAME, settings); - ClusterState state = createIndexWithSettings( - cluster, - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(1, indexShardRoutingTable.replicaShards().size()); assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); // add another replica - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build() ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + client().admin().indices().updateSettings(updateSettingsRequest).get(); + indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(2, indexShardRoutingTable.replicaShards().size()); assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); // remove all replicas - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) + updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build() ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + client().admin().indices().updateSettings(updateSettingsRequest).get(); + indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(0, indexShardRoutingTable.replicaShards().size()); assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); } + private IndexShardRoutingTable getIndexShardRoutingTable() { + return client().admin().cluster().prepareState().get().getState().getRoutingTable().index(INDEX_NAME).getShards().get(0); + } + private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settings settings) { List allNodes = new ArrayList<>(); // node for primary/local @@ -149,48 +160,32 @@ private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settin } ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - CreateIndexRequest request = new CreateIndexRequest("index", settings).waitForActiveShards(ActiveShardCount.NONE); + CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME, settings).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); return state; } public void testUpdateSearchReplicasOverShardLimit() { - final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + Settings settings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) + .build(); + createIndex(INDEX_NAME, settings); + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ); // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) - ) + ExecutionException executionException = expectThrows( + ExecutionException.class, + () -> client().admin().indices().updateSettings(updateSettingsRequest).get() ); + Throwable cause = executionException.getCause(); + assertEquals(ValidationException.class, cause.getClass()); } public void testUpdateSearchReplicasOnDocrepCluster() { @@ -206,7 +201,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); CreateIndexRequest request = new CreateIndexRequest( - "index", + INDEX_NAME, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) @@ -214,7 +209,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { .build() ).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); + assertTrue(state.metadata().hasIndex(INDEX_NAME)); rerouteUntilActive(state, cluster); // add another replica @@ -224,7 +219,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { RuntimeException.class, () -> cluster.updateSettings( finalState, - new UpdateSettingsRequest("index").settings( + new UpdateSettingsRequest(INDEX_NAME).settings( Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() ) ) @@ -232,11 +227,51 @@ public void testUpdateSearchReplicasOnDocrepCluster() { } + Path tempDir = createTempDir(); + Path repo = tempDir.resolve("repo"); + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(buildRemoteStoreNodeAttributes(TEST_RS_REPO, repo)) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put(Environment.PATH_REPO_SETTING.getKey(), repo) + .build(); + } + + private Settings buildRemoteStoreNodeAttributes(String repoName, Path repoPath) { + String repoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + repoName + ); + String repoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + repoName + ); + + return Settings.builder() + .put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) + .build(); + } + private static void rerouteUntilActive(ClusterState state, ClusterStateChanges cluster) { - while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + while (state.routingTable().index(INDEX_NAME).shard(0).allShardsStarted() == false) { state = cluster.applyStartedShards( state, - state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING) + state.routingTable().index(INDEX_NAME).shard(0).shardsWithState(ShardRoutingState.INITIALIZING) ); state = cluster.reroute(state, new ClusterRerouteRequest()); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 96794a83ef762..535adfbff8dcc 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3011,6 +3011,52 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep closeShards(target); } + public void testRestoreSearchOnlyShardFromStore() throws IOException { + // this test indexes docs on a primary, refreshes, then recovers a new Search Replica and asserts + // all docs are present + String remoteStorePath = createTempDir().toString(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test") + .build(); + IndexShard primary = newStartedShard(true, settings, new InternalEngineFactory()); + indexDoc(primary, "_doc", "1"); + indexDoc(primary, "_doc", "2"); + primary.refresh("test"); + assertDocs(primary, "1", "2"); + + ShardRouting searchReplicaShardRouting = TestShardRouting.newShardRouting( + primary.shardId, + randomAlphaOfLength(10), + false, + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + IndexShard replica = newShard(searchReplicaShardRouting, settings, new NRTReplicationEngineFactory()); + recoverShardFromStore(replica); + searchReplicaShardRouting = replica.routingEntry(); + assertDocs(replica, "1", "2"); + assertEquals( + primary.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replica.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + + // move to unassigned while the replica is active, then reinit from existing store. + searchReplicaShardRouting = ShardRoutingHelper.moveToUnassigned( + searchReplicaShardRouting, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so") + ); + searchReplicaShardRouting = ShardRoutingHelper.initialize(searchReplicaShardRouting, replica.routingEntry().currentNodeId()); + assertEquals(RecoverySource.ExistingStoreRecoverySource.INSTANCE, searchReplicaShardRouting.recoverySource()); + replica = reinitShard(replica, searchReplicaShardRouting); + recoverShardFromStore(replica); + assertDocs(replica, "1", "2"); + closeShards(primary, replica); + } + public void testReaderWrapperIsUsed() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}"); diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index 9a000a4eeda72..a6af658be2ca1 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -342,4 +342,26 @@ public static ShardRouting newShardRouting( -1 ); } + + public static ShardRouting newShardRouting( + ShardId shardId, + String currentNodeId, + boolean primary, + boolean searchOnly, + ShardRoutingState state, + RecoverySource recoverySource + ) { + return new ShardRouting( + shardId, + currentNodeId, + null, + primary, + searchOnly, + state, + recoverySource, + buildUnassignedInfo(state), + buildAllocationId(state), + -1 + ); + } }