diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc63c36ae838f..3d54788cc338a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -16,6 +16,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,15 +57,17 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { - private static final String INDEX_NAME = "test-idx-1"; - private static final int SHARD_COUNT = 1; - private static final int REPLICA_COUNT = 1; + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; @Override protected Collection> nodePlugins() { @@ -91,6 +95,26 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); } + public void ingestDocs(int docCount) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); @@ -132,6 +156,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertSegmentStats(REPLICA_COUNT); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); @@ -266,6 +291,7 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(featureFlagSettings()); final String nodeB = internalCluster().startNode(featureFlagSettings()); @@ -497,6 +523,7 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); @@ -598,6 +625,61 @@ public void testDeleteOperations() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testUpdateOperations() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startNode(featureFlagSettings()); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id) + .setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); + assertEquals(2, updateResponse.getVersion()); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); @@ -689,7 +771,7 @@ public void testDropPrimaryDuringReplication() throws Exception { /** * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception + * @throws Exception if assertion fails */ private void waitForReplicaUpdate() throws Exception { // wait until the replica has the latest segment generation. @@ -706,7 +788,7 @@ private void waitForReplicaUpdate() throws Exception { // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false) { + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java new file mode 100644 index 0000000000000..4e27e6a5cbd89 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -0,0 +1,280 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.BeforeClass; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase { + private static final String INDEX_NAME = "test-segrep-idx"; + private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + private static final int DOC_COUNT = 1010; + + private static final String REPOSITORY_NAME = "test-segrep-repo"; + private static final String SNAPSHOT_NAME = "test-segrep-snapshot"; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + public Settings segRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings docRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + public Settings.Builder getShardSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + } + + public Settings restoreIndexSegRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings restoreIndexDocRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void ingestData(int docCount, String indexName) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + indexName, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(indexName); + } + } + + // Start cluster with provided settings and return the node names as list + public List startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception { + // Start primary + final String primaryNode = internalCluster().startNode(); + List nodeNames = new ArrayList<>(); + nodeNames.add(primaryNode); + for (int i = 0; i < replicaCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + // Ingest data + ingestData(DOC_COUNT, INDEX_NAME); + return nodeNames; + } + + public void createSnapshot() { + // Snapshot declaration + Path absolutePath = randomRepoPath().toAbsolutePath(); + // Create snapshot + createRepository(REPOSITORY_NAME, "fs", absolutePath); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(true) + .setIndices(INDEX_NAME) + .get(); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + } + + public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { + RestoreSnapshotRequestBuilder builder = client().admin() + .cluster() + .prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(false) + .setRenamePattern(INDEX_NAME) + .setRenameReplacement(RESTORED_INDEX_NAME); + if (indexSettings != null) { + builder.setIndexSettings(indexSettings); + } + return builder.get(); + } + + public void testRestoreOnSegRep() throws Exception { + // Start cluster with one primary and one replica node + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ingestData(5000, RESTORED_INDEX_NAME); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT + 5000); + } + + public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { + // Start a cluster with one primary and one replica + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testRestoreOnReplicaNode() throws Exception { + List nodeNames = startClusterWithSettings(segRepEnableIndexSettings(), 1); + final String primaryNode = nodeNames.get(0); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + // stop the primary node so that restoration happens on replica node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + internalCluster().startNode(); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a1a755cd34e9b..dfeda6809b77b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -1437,22 +1438,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @return true if checkpoint should be processed + * Checks if this target shard should start a round of segment replication. + * @return - True if the shard is able to perform segment replication. */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { - if (state().equals(IndexShardState.STARTED) == false) { - logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + public boolean isSegmentReplicationAllowed() { + if (indexSettings.isSegRepEnabled() == false) { + logger.warn("Attempting to perform segment replication when it is not enabled on the index"); return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } if (this.routingEntry().primary()) { - logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + logger.warn("Shard is marked as primary and cannot perform segment replication as a replica"); + return false; + } + if (state().equals(IndexShardState.STARTED) == false + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + logger.warn( + () -> new ParameterizedMessage( + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", + state(), + shardRouting.state() + ) + ); + return false; + } + if (getReplicationEngine().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Shard does not have the correct engine type to perform segment replication {}.", + getEngine().getClass() + ) + ); + return false; + } + return true; + } + + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (isSegmentReplicationAllowed() == false) { return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 59263878f1f76..1ec5a9cb4ecc3 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -51,7 +51,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -811,11 +810,7 @@ private void forceSegmentReplication( StepListener forceSegRepListener ) { IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { + if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { segmentReplicationTargetService.startReplication( ReplicationCheckpoint.empty(shardRouting.shardId()), indexShard, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 11fae987174f6..8562e6de53332 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; @@ -90,6 +91,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { + final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + assertFalse(indexShard.isSegmentReplicationAllowed()); + closeShards(indexShard); + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {