Skip to content

Commit

Permalink
[Segment Replication] Remove primary targets from replication tracker (
Browse files Browse the repository at this point in the history
…#11011)

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Oct 31, 2023
1 parent 9d85e56 commit 9c65350
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -1227,6 +1228,14 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private boolean isPrimaryRelocation(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
}

private void createReplicationLagTimers() {
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
Expand All @@ -1236,6 +1245,7 @@ private void createReplicationLagTimers() {
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
Expand Down Expand Up @@ -1267,6 +1277,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
Expand All @@ -1291,6 +1302,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
&& isPrimaryRelocation(entry.getKey()) == false
)
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,86 @@ public void testSegmentReplicationCheckpointTracking() {
}
}

public void testSegmentReplicationCheckpointForRelocatingPrimary() {
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final long initialClusterStateVersion = randomNonNegativeLong();
final int numberOfActiveAllocationsIds = randomIntBetween(2, 2);
final int numberOfInitializingIds = randomIntBetween(2, 2);
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds(
numberOfActiveAllocationsIds,
numberOfInitializingIds
);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();

AllocationId targetAllocationId = initializingIds.iterator().next();
AllocationId primaryId = activeAllocationIds.iterator().next();
String relocatingToNodeId = nodeIdFromAllocationId(targetAllocationId);

logger.info("--> activeAllocationIds {} Primary {}", activeAllocationIds, primaryId.getId());
logger.info("--> initializingIds {} Target {}", initializingIds, targetAllocationId);

final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
for (final AllocationId initializingId : initializingIds) {
boolean primaryRelocationTarget = initializingId.equals(targetAllocationId);
builder.addShard(
TestShardRouting.newShardRouting(
shardId,
nodeIdFromAllocationId(initializingId),
null,
primaryRelocationTarget,
ShardRoutingState.INITIALIZING,
initializingId
)
);
}
builder.addShard(
TestShardRouting.newShardRouting(
shardId,
nodeIdFromAllocationId(primaryId),
relocatingToNodeId,
true,
ShardRoutingState.STARTED,
primaryId
)
);
IndexShardRoutingTable routingTable = builder.build();
final ReplicationTracker tracker = newTracker(primaryId, settings);
tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));

final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 5L, "abcd", Version.LATEST);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
5L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1)
);
tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.startReplicationLagTimers(initialCheckpoint);

final Set<String> expectedIds = initializingIds.stream()
.filter(id -> id.equals(targetAllocationId))
.map(AllocationId::getId)
.collect(Collectors.toSet());

Set<SegmentReplicationShardStats> groupStats = tracker.getSegmentReplicationStats();
assertEquals(expectedIds.size(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(1, shardStat.getCheckpointsBehindCount());
assertEquals(5L, shardStat.getBytesBehindCount());
assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis());
}
}

public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() {
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final long initialClusterStateVersion = randomNonNegativeLong();
Expand Down

0 comments on commit 9c65350

Please sign in to comment.