From 4852671545feed76d33ad1ba87f88f5ec0ee8cab Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 5 Jan 2023 15:28:45 -0800 Subject: [PATCH 1/2] Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs. This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint. In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed. This change differentiates shard validation from checkpoint validation. Signed-off-by: Marc Handalian Fix spotless. Signed-off-by: Marc Handalian Fix testIsSegmentReplicationAllowed_WrongEngineType. Signed-off-by: Marc Handalian Update warn logs in isSegmentReplicationAllowed. Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 1 - .../opensearch/index/shard/IndexShard.java | 50 +++++++++++++++---- .../cluster/IndicesClusterStateService.java | 7 +-- .../SegmentReplicationIndexShardTests.java | 7 +++ 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af5484aa0fd1a..6598c70fa4760 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Increasing timeout of testQuorumRecovery to 90 seconds from 30 ([#5651](https://github.com/opensearch-project/OpenSearch/pull/5651)) - [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) +- Fixed flaky test SegmentReplicationIT.testStartReplicaAfterPrimaryIndexesDocs ([#5722](https://github.com/opensearch-project/OpenSearch/pull/5722)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 02da12904a6e7..0d8082da00cd0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -522,7 +522,6 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5a5356282681e..72e8854d9a3ec 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @return true if checkpoint should be processed + * Checks if this shard is able to perform segment replication. + * @return - True if the shard is able to perform segment replication. */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { - if (state().equals(IndexShardState.STARTED) == false) { - logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + public boolean isSegmentReplicationAllowed() { + if (indexSettings.isSegRepEnabled() == false) { + logger.warn("Attempting to perform segment replication when it is not enabled on the index"); return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } if (this.routingEntry().primary()) { - logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + logger.warn("Shard is marked as primary and cannot perform segment replication as a replica"); + return false; + } + if (state().equals(IndexShardState.STARTED) == false + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + logger.warn( + () -> new ParameterizedMessage( + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", + state(), + shardRouting.state() + ) + ); + return false; + } + if (getReplicationEngine().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Shard does not have the correct engine type to perform segment replication {}.", + getEngine().getClass() + ) + ); + return false; + } + return true; + } + + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (isSegmentReplicationAllowed() == false) { return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 83f4e0c7cbed9..e8adcbdc1c89a 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; @@ -811,11 +810,7 @@ private void forceSegmentReplication( StepListener forceSegRepListener ) { IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { + if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { segmentReplicationTargetService.startReplication( ReplicationCheckpoint.empty(shardRouting.shardId()), indexShard, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c46f97b5ec785..44771faf36871 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { + final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + assertFalse(indexShard.isSegmentReplicationAllowed()); + closeShards(indexShard); + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { From 7b2bc53ac9278c0637cf9c2b5fd915114a5b44f8 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 6 Jan 2023 12:05:24 -0800 Subject: [PATCH 2/2] PR feedback. Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 - server/src/main/java/org/opensearch/index/shard/IndexShard.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6598c70fa4760..af5484aa0fd1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,7 +90,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Increasing timeout of testQuorumRecovery to 90 seconds from 30 ([#5651](https://github.com/opensearch-project/OpenSearch/pull/5651)) - [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) -- Fixed flaky test SegmentReplicationIT.testStartReplicaAfterPrimaryIndexesDocs ([#5722](https://github.com/opensearch-project/OpenSearch/pull/5722)) ### Security diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 72e8854d9a3ec..60c52e9583821 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1455,7 +1455,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if this shard is able to perform segment replication. + * Checks if this target shard should start a round of segment replication. * @return - True if the shard is able to perform segment replication. */ public boolean isSegmentReplicationAllowed() {