diff --git a/CHANGELOG.md b/CHANGELOG.md index 01c56fe762ec5..b0def508db314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -175,6 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) - Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369)) - Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167)) +- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java index b7b3f1d14f422..d5cdc22a15478 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -8,10 +8,16 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationState; @@ -20,10 +26,12 @@ import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.SlowClusterStateProcessing; import java.nio.file.Path; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * This class runs tests with remote store + segRep while blocking file downloads @@ -111,6 +119,75 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { cleanupRepo(); } + public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRelocation() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + Settings nodeSettings = Settings.builder().put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build(); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + internalCluster().startDataOnlyNodes(2, nodeSettings); + final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String oldPrimary = getNode(dataNodeNames, true); + + // index a doc. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); + refresh(INDEX_NAME); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("4") + .get(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000); + internalCluster().setDisruptionScheme(disruption); + disruption.startDisrupting(); + + // relocate the primary + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(new TimeValue(5, TimeUnit.MINUTES)) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME); + IndexShard replica = getIndexShard(replicaNode, INDEX_NAME); + assertBusy(() -> { + assertEquals( + newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replica.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + + assertBusy(() -> { + ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get(); + ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats(); + assertEquals(0L, replicationStats.maxBytesBehind); + assertEquals(0L, replicationStats.maxReplicationLag); + assertEquals(0L, replicationStats.totalBytesBehind); + }); + disruption.stopDisrupting(); + disableRepoConsistencyCheck("Remote Store Creates System Repository"); + cleanupRepo(); + } + private String getNode(Set dataNodeNames, boolean primary) { assertEquals(2, dataNodeNames.size()); for (String name : dataNodeNames) { diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 3113428ec60ef..7b9c1d3aa548f 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1322,8 +1322,10 @@ private SegmentReplicationShardStats buildShardStats(final String allocationId, allocationId, cps.checkpointTimers.size(), bytesBehind, - cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), - cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), + bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0) : 0, + bytesBehind > 0L + ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0) + : 0, cps.lastCompletedReplicationLag ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7f9e5f31d1976..cbb246219546b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1764,8 +1764,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp if (isSegmentReplicationAllowed() == false) { return false; } - ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); - if (localCheckpoint.isAheadOf(requestCheckpoint)) { + final ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (requestCheckpoint.isAheadOf(localCheckpoint) == false) { logger.trace( () -> new ParameterizedMessage( "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", @@ -1775,12 +1775,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } - if (localCheckpoint.equals(requestCheckpoint)) { - logger.trace( - () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) - ); - return false; - } return true; } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index cb738d74000bc..d6db154a4e0e3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -15,10 +15,13 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -26,6 +29,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -61,7 +65,7 @@ * * @opensearch.internal */ -public class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -144,6 +148,53 @@ public SegmentReplicationTargetService( ); } + @Override + protected void doStart() { + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); + } + } + + @Override + protected void doStop() { + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.removeListener(this); + } + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.routingTableChanged()) { + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) { + for (IndexShard shard : indexService) { + if (shard.routingEntry().primary() == false) { + // for this shard look up its primary routing, if it has completed a relocation trigger replication + final String previousNode = event.previousState() + .routingTable() + .shardRoutingTable(shard.shardId()) + .primaryShard() + .currentNodeId(); + final String currentNode = event.state() + .routingTable() + .shardRoutingTable(shard.shardId()) + .primaryShard() + .currentNodeId(); + if (previousNode.equals(currentNode) == false) { + processLatestReceivedCheckpoint(shard, Thread.currentThread()); + } + } + } + } + } + } + } + /** * Cancel any replications on this node for a replica that is about to be closed. */ @@ -395,7 +446,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) { // visible to tests protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) { final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId()); - if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + if (latestPublishedCheckpoint != null) { logger.trace( () -> new ParameterizedMessage( "Processing latest received checkpoint for shard {} {}", diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3a4860a9bf5ff..4cbf8dc191a9d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1414,6 +1414,7 @@ public Node start() throws NodeValidationException { assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; injector.getInstance(PeerRecoverySourceService.class).start(); + injector.getInstance(SegmentReplicationTargetService.class).start(); injector.getInstance(SegmentReplicationSourceService.class).start(); final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class); @@ -1602,6 +1603,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(IndicesStore.class)); toClose.add(injector.getInstance(PeerRecoverySourceService.class)); toClose.add(injector.getInstance(SegmentReplicationSourceService.class)); + toClose.add(injector.getInstance(SegmentReplicationTargetService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 252f3975bab25..f284a425a417b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -12,11 +12,18 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -24,6 +31,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.index.IndexService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -51,6 +59,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private SegmentReplicationState state; private ReplicationCheckpoint initialCheckpoint; + private ClusterState clusterState; + private static final long TRANSPORT_TIMEOUT = 30000;// 30sec @Override @@ -129,7 +140,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); ClusterService clusterService = mock(ClusterService.class); - ClusterState clusterState = mock(ClusterState.class); + clusterState = mock(ClusterState.class); RoutingTable mockRoutingTable = mock(RoutingTable.class); when(clusterService.state()).thenReturn(clusterState); when(clusterState.routingTable()).thenReturn(mockRoutingTable); @@ -465,9 +476,22 @@ public void testStartReplicationListenerFailure() throws InterruptedException { verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard)); } - public void testDoNotProcessLatestCheckpointIfItIsbehind() { - sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); - assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null)); + public void testDoNotProcessLatestCheckpointIfCheckpointIsBehind() { + SegmentReplicationTargetService service = spy(sut); + doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any()); + ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint(); + service.updateLatestReceivedCheckpoint(checkpoint, replicaShard); + service.processLatestReceivedCheckpoint(replicaShard, null); + verify(service, times(0)).startReplication(eq(replicaShard), eq(checkpoint), any()); + } + + public void testProcessLatestCheckpointIfCheckpointAhead() { + SegmentReplicationTargetService service = spy(sut); + doNothing().when(service).startReplication(any()); + doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any()); + service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); + service.processLatestReceivedCheckpoint(replicaShard, null); + verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any()); } public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException { @@ -617,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile target.cancel("test"); sut.startReplication(target); } + + public void testProcessCheckpointOnClusterStateUpdate() { + // set up mocks on indicies & index service to return our replica's index & shard. + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator()); + when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings()); + when(indexService.index()).thenReturn(replicaShard.routingEntry().index()); + when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator()); + + // create old & new cluster states + final String targetNodeId = "targetNodeId"; + ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L); + assertEquals(ShardRoutingState.RELOCATING, initialRouting.state()); + + ShardRouting targetRouting = ShardRouting.newUnassigned( + primaryShard.shardId(), + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test") + ).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted(); + assertEquals(targetNodeId, targetRouting.currentNodeId()); + assertEquals(ShardRoutingState.STARTED, targetRouting.state()); + ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable( + RoutingTable.builder() + .add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build()) + .build() + ) + .build(); + ClusterState newState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable( + RoutingTable.builder() + .add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build()) + .build() + ) + .build(); + + // spy so we can verify process is invoked + SegmentReplicationTargetService spy = spy(sut); + spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState)); + verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any()); + } }