diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f48df082a25dc..a2996d87a851b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; @@ -62,6 +63,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexModule; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; @@ -94,6 +96,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -1776,4 +1779,58 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { assertThat(response.getIndex(), equalTo(INDEX_NAME)); } + + public void testReplicaAlreadyAtCheckpoint() throws Exception { + final List nodes = new ArrayList<>(); + final String primaryNode = internalCluster().startDataOnlyNode(); + nodes.add(primaryNode); + final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + createIndex(INDEX_NAME, settings); + ensureGreen(INDEX_NAME); + // start a replica node, initially will be empty with no shard assignment. + final String replicaNode = internalCluster().startDataOnlyNode(); + nodes.add(replicaNode); + final String replicaNode2 = internalCluster().startDataOnlyNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)) + ); + ensureGreen(INDEX_NAME); + + // index a doc. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); + refresh(INDEX_NAME); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + IndexShard replica_1 = getIndexShard(replicaNode, INDEX_NAME); + IndexShard replica_2 = getIndexShard(replicaNode2, INDEX_NAME); + // wait until a replica is promoted & finishes engine flip, we don't care which one + AtomicReference primary = new AtomicReference<>(); + assertBusy(() -> { + assertTrue("replica should be promoted as a primary", replica_1.routingEntry().primary() || replica_2.routingEntry().primary()); + primary.set(replica_1.routingEntry().primary() ? replica_1 : replica_2); + }); + + FlushRequest request = new FlushRequest(INDEX_NAME); + request.force(true); + primary.get().flush(request); + + assertBusy(() -> { + assertEquals( + replica_1.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replica_2.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + + assertBusy(() -> { + ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get(); + ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats(); + assertEquals(0L, replicationStats.maxBytesBehind); + assertEquals(0L, replicationStats.maxReplicationLag); + assertEquals(0L, replicationStats.totalBytesBehind); + }); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 46095adfe96b4..73da0482537ad 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -53,6 +53,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT; /** @@ -282,6 +283,12 @@ public void onReplicationFailure( } } }); + } else if (replicaShard.isSegmentReplicationAllowed()) { + // if we didn't process the checkpoint because we are up to date, + // send our latest checkpoint to the primary to update tracking. + // replicationId is not used by the primary set to a default value. + final long replicationId = NO_OPS_PERFORMED; + updateVisibleCheckpoint(replicationId, replicaShard); } } else { logger.trace( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7b02635525264..252f3975bab25 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -55,7 +55,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import static org.junit.Assert.assertEquals; +import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.atLeastOnce; @@ -247,6 +247,7 @@ public void testAlreadyOnNewCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(1)).updateVisibleCheckpoint(NO_OPS_PERFORMED, replicaShard); } @TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")