From 51f7ea04333296fac5fcedbef8f4f69682bbe0ae Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 19 Jan 2023 18:02:32 -0700 Subject: [PATCH] [Backport 2.x] Backport SegmentReplication test muting missing from 2.x. (#5945) * [Segment Replication] Add snapshot and restore tests for segment replication feature (#3993) * [Segment Replication] Add snapshots tests with segment replication enabled Signed-off-by: Suraj Singh * Fix spotless failures Signed-off-by: Suraj Singh * Add changelog entry, address review comments, add failover test Signed-off-by: Suraj Singh * Fix spotless failures Signed-off-by: Suraj Singh * Address review comments 2 Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh * Remove changelog update. Signed-off-by: Marc Handalian * Mute flaky test testStartReplicaAfterPrimaryIndexesDocs. (#5714) Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian * Fix flaky Segment Replication test testStartReplicaAfterPrimaryIndexesDocs. (#5722) * Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs. This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint. In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed. This change differentiates shard validation from checkpoint validation. Signed-off-by: Marc Handalian Fix spotless. Signed-off-by: Marc Handalian Fix testIsSegmentReplicationAllowed_WrongEngineType. Signed-off-by: Marc Handalian Update warn logs in isSegmentReplicationAllowed. Signed-off-by: Marc Handalian * PR feedback. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian * [Segment Replication] Mute flaky tests (#5739) Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh * [Segment Replication] Mute flaky tests (#5742) Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh * Fix spotless. Signed-off-by: Marc Handalian * Muting flaky SegmentReplication ITs. (#5700) Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian Signed-off-by: Suraj Singh Signed-off-by: Marc Handalian Co-authored-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 94 +++++- .../SegmentReplicationSnapshotIT.java | 280 ++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 50 +++- .../cluster/IndicesClusterStateService.java | 7 +- .../SegmentReplicationIndexShardTests.java | 7 + 5 files changed, 417 insertions(+), 21 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc63c36ae838f..3d54788cc338a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -16,6 +16,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,15 +57,17 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { - private static final String INDEX_NAME = "test-idx-1"; - private static final int SHARD_COUNT = 1; - private static final int REPLICA_COUNT = 1; + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; @Override protected Collection> nodePlugins() { @@ -91,6 +95,26 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); } + public void ingestDocs(int docCount) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); @@ -132,6 +156,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertSegmentStats(REPLICA_COUNT); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); @@ -266,6 +291,7 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(featureFlagSettings()); final String nodeB = internalCluster().startNode(featureFlagSettings()); @@ -497,6 +523,7 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); @@ -598,6 +625,61 @@ public void testDeleteOperations() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testUpdateOperations() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startNode(featureFlagSettings()); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id) + .setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); + assertEquals(2, updateResponse.getVersion()); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); @@ -689,7 +771,7 @@ public void testDropPrimaryDuringReplication() throws Exception { /** * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception + * @throws Exception if assertion fails */ private void waitForReplicaUpdate() throws Exception { // wait until the replica has the latest segment generation. @@ -706,7 +788,7 @@ private void waitForReplicaUpdate() throws Exception { // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false) { + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java new file mode 100644 index 0000000000000..4e27e6a5cbd89 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -0,0 +1,280 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.BeforeClass; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase { + private static final String INDEX_NAME = "test-segrep-idx"; + private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + private static final int DOC_COUNT = 1010; + + private static final String REPOSITORY_NAME = "test-segrep-repo"; + private static final String SNAPSHOT_NAME = "test-segrep-snapshot"; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + public Settings segRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings docRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + public Settings.Builder getShardSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + } + + public Settings restoreIndexSegRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings restoreIndexDocRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void ingestData(int docCount, String indexName) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + indexName, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(indexName); + } + } + + // Start cluster with provided settings and return the node names as list + public List startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception { + // Start primary + final String primaryNode = internalCluster().startNode(); + List nodeNames = new ArrayList<>(); + nodeNames.add(primaryNode); + for (int i = 0; i < replicaCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + // Ingest data + ingestData(DOC_COUNT, INDEX_NAME); + return nodeNames; + } + + public void createSnapshot() { + // Snapshot declaration + Path absolutePath = randomRepoPath().toAbsolutePath(); + // Create snapshot + createRepository(REPOSITORY_NAME, "fs", absolutePath); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(true) + .setIndices(INDEX_NAME) + .get(); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + } + + public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { + RestoreSnapshotRequestBuilder builder = client().admin() + .cluster() + .prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(false) + .setRenamePattern(INDEX_NAME) + .setRenameReplacement(RESTORED_INDEX_NAME); + if (indexSettings != null) { + builder.setIndexSettings(indexSettings); + } + return builder.get(); + } + + public void testRestoreOnSegRep() throws Exception { + // Start cluster with one primary and one replica node + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ingestData(5000, RESTORED_INDEX_NAME); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT + 5000); + } + + public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { + // Start a cluster with one primary and one replica + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testRestoreOnReplicaNode() throws Exception { + List nodeNames = startClusterWithSettings(segRepEnableIndexSettings(), 1); + final String primaryNode = nodeNames.get(0); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + // stop the primary node so that restoration happens on replica node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + internalCluster().startNode(); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a1a755cd34e9b..dfeda6809b77b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -1437,22 +1438,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @return true if checkpoint should be processed + * Checks if this target shard should start a round of segment replication. + * @return - True if the shard is able to perform segment replication. */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { - if (state().equals(IndexShardState.STARTED) == false) { - logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + public boolean isSegmentReplicationAllowed() { + if (indexSettings.isSegRepEnabled() == false) { + logger.warn("Attempting to perform segment replication when it is not enabled on the index"); return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } if (this.routingEntry().primary()) { - logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + logger.warn("Shard is marked as primary and cannot perform segment replication as a replica"); + return false; + } + if (state().equals(IndexShardState.STARTED) == false + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + logger.warn( + () -> new ParameterizedMessage( + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", + state(), + shardRouting.state() + ) + ); + return false; + } + if (getReplicationEngine().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Shard does not have the correct engine type to perform segment replication {}.", + getEngine().getClass() + ) + ); + return false; + } + return true; + } + + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (isSegmentReplicationAllowed() == false) { return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 59263878f1f76..1ec5a9cb4ecc3 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -51,7 +51,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -811,11 +810,7 @@ private void forceSegmentReplication( StepListener forceSegRepListener ) { IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { + if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { segmentReplicationTargetService.startReplication( ReplicationCheckpoint.empty(shardRouting.shardId()), indexShard, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 11fae987174f6..8562e6de53332 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; @@ -90,6 +91,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { + final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + assertFalse(indexShard.isSegmentReplicationAllowed()); + closeShards(indexShard); + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {