diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 94f376d923689..3a1faa2b9bcde 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -71,6 +71,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -1232,10 +1233,17 @@ private void createReplicationLagTimers() { final String allocationId = entry.getKey(); if (allocationId.equals(this.shardAllocationId) == false) { final CheckpointState cps = entry.getValue(); + Optional shardRouting = replicationGroup.getRoutingTable() + .shards() + .stream() + .filter(routing -> routing.allocationId().getId().equals(allocationId)) + .findAny(); // if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state. // it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event. if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false + && shardRouting.isPresent() + && shardRouting.get().primary() == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer()); logger.trace( @@ -1287,11 +1295,18 @@ public synchronized Set getSegmentReplicationStats .stream() // filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not // been assigned to a node). - .filter( - entry -> entry.getKey().equals(this.shardAllocationId) == false + .filter(entry -> { + Optional shardRouting = replicationGroup.getRoutingTable() + .shards() + .stream() + .filter(routing -> routing.allocationId().getId().equals(entry.getKey())) + .findAny(); + return entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false - ) + && shardRouting.isPresent() + && shardRouting.get().primary() == false; + }) .map(entry -> buildShardStats(entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet()); } diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 28c95ddf13fc4..6783cb391e981 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -1907,6 +1907,92 @@ public void testSegmentReplicationCheckpointTracking() { } } + public void testSegmentReplicationCheckpointForRelocatingPrimary() { + Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + final long initialClusterStateVersion = randomNonNegativeLong(); + final int numberOfActiveAllocationsIds = randomIntBetween(2, 2); + final int numberOfInitializingIds = randomIntBetween(2, 2); + final Tuple, Set> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds( + numberOfActiveAllocationsIds, + numberOfInitializingIds + ); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingIds = activeAndInitializingAllocationIds.v2(); + + AllocationId targetAllocationId = initializingIds.iterator().next(); + AllocationId primaryId = activeAllocationIds.iterator().next(); + String relocatingToNodeId = nodeIdFromAllocationId(targetAllocationId); + + logger.info("--> activeAllocationIds {} Primary {}", activeAllocationIds, primaryId.getId()); + logger.info("--> initializingIds {} Target {}", initializingIds, targetAllocationId); + + final ShardId shardId = new ShardId("test", "_na_", 0); + final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); + for (final AllocationId initializingId : initializingIds) { + boolean primaryRelocationTarget = initializingId.equals(targetAllocationId); + builder.addShard( + TestShardRouting.newShardRouting( + shardId, + nodeIdFromAllocationId(initializingId), + null, + primaryRelocationTarget, + ShardRoutingState.INITIALIZING, + initializingId + ) + ); + } + builder.addShard( + TestShardRouting.newShardRouting( + shardId, + nodeIdFromAllocationId(primaryId), + relocatingToNodeId, + true, + ShardRoutingState.STARTED, + primaryId + ) + ); + IndexShardRoutingTable routingTable = builder.build(); + final ReplicationTracker tracker = newTracker(primaryId, settings); + tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); + assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); + assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable)); + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); + initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + + final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 5L, "abcd", Version.LATEST); + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 1, + 1, + 5L, + Codec.getDefault().getName(), + Map.of("segment_1", segment_1) + ); + tracker.setLatestReplicationCheckpoint(initialCheckpoint); + tracker.startReplicationLagTimers(initialCheckpoint); + + final Set expectedIds = ids(initializingIds); + + Set groupStats = tracker.getSegmentReplicationStats(); + assertEquals(expectedIds.size() - 1, groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(1, shardStat.getCheckpointsBehindCount()); + assertEquals(5L, shardStat.getBytesBehindCount()); + assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis()); + } + + // simulate replicas moved up to date. + // final Map checkpoints = tracker.checkpoints; + // for (String id : expectedIds) { + // final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); + // assertEquals(3, checkpointState.checkpointTimers.size()); + // tracker.updateVisibleCheckpointForShard(id, initialCheckpoint); + // assertEquals(2, checkpointState.checkpointTimers.size()); + // } + } + public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); final long initialClusterStateVersion = randomNonNegativeLong();