diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index bcb4a199bc32d..d7699ef3b1e9a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -37,8 +37,8 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING; -import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT { @@ -49,7 +49,6 @@ public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND) .build(); @@ -238,6 +237,30 @@ public void testFailStaleReplica() throws Exception { assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } + public void testWithDocumentReplicationEnabledIndex() throws Exception { + Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build(); + // Starts a primary and replica node. + final String primaryNode = internalCluster().startNode(settings); + createIndex( + INDEX_NAME, + Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build() + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(settings); + ensureGreen(INDEX_NAME); + final AtomicInteger totalDocs = new AtomicInteger(0); + // Index docs until replica stale limit is reached. + totalDocs.getAndSet(indexUntilCheckpointCount()); + // index again after stale limit. + indexDoc(); + refresh(INDEX_NAME); + totalDocs.incrementAndGet(); + // verify total doc count is same and docs are not rejected. + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get()); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get()); + + } + public void testBulkWritesRejected() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 3b1d71675e75b..28b6d0a7d12ae 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -52,7 +52,7 @@ public class SegmentReplicationPressureService implements Closeable { */ public static final Setting SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED = Setting.boolSetting( "segrep.pressure.enabled", - false, + true, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -128,9 +128,11 @@ AsyncFailStaleReplicaTask getFailStaleReplicaTask() { public void isSegrepLimitBreached(ShardId shardId) { final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard shard = indexService.getShard(shardId.id()); - if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) { - validateReplicationGroup(shard); + if (indexService != null) { + final IndexShard shard = indexService.getShard(shardId.id()); + if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) { + validateReplicationGroup(shard); + } } } @@ -233,6 +235,9 @@ protected void runInternal() { stats.getShardStats().get(shardId).getReplicaStats() ); final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); + if (indexService.getIndexSettings() != null && indexService.getIndexSettings().isSegRepEnabled() == false) { + return; + } final IndexShard primaryShard = indexService.getShard(shardId.getId()); for (SegmentReplicationShardStats staleReplica : staleReplicas) { if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 1ebdd111bfed3..5217a9ff4f00e 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -41,14 +41,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING; -import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED; public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevelReplicationTestCase { private static ShardStateAction shardStateAction = Mockito.mock(ShardStateAction.class); private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(5)) .build(); @@ -99,10 +97,7 @@ public void testIsSegrepLimitBreached() throws Exception { } public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Exception { - final Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) - .build(); + final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll(); @@ -126,10 +121,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep } public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { - final Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) - .build(); + final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll(); @@ -194,7 +186,6 @@ public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { public void testFailStaleReplicaTask() throws Exception { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10)) .build();