Skip to content

Commit

Permalink
[Segment Replication] Remove primary targets from replication tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Oct 30, 2023
1 parent 9d85e56 commit 430360e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -1232,10 +1233,17 @@ private void createReplicationLagTimers() {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
Optional<ShardRouting> shardRouting = replicationGroup.getRoutingTable()
.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
// if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state.
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& shardRouting.isPresent()
&& shardRouting.get().primary() == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
Expand Down Expand Up @@ -1287,11 +1295,18 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
.stream()
// filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not
// been assigned to a node).
.filter(
entry -> entry.getKey().equals(this.shardAllocationId) == false
.filter(entry -> {
Optional<ShardRouting> shardRouting = replicationGroup.getRoutingTable()
.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(entry.getKey()))
.findAny();
return entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
)
&& shardRouting.isPresent()
&& shardRouting.get().primary() == false;
})
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,92 @@ public void testSegmentReplicationCheckpointTracking() {
}
}

public void testSegmentReplicationCheckpointForRelocatingPrimary() {
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final long initialClusterStateVersion = randomNonNegativeLong();
final int numberOfActiveAllocationsIds = randomIntBetween(2, 2);
final int numberOfInitializingIds = randomIntBetween(2, 2);
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds(
numberOfActiveAllocationsIds,
numberOfInitializingIds
);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();

AllocationId targetAllocationId = initializingIds.iterator().next();
AllocationId primaryId = activeAllocationIds.iterator().next();
String relocatingToNodeId = nodeIdFromAllocationId(targetAllocationId);

logger.info("--> activeAllocationIds {} Primary {}", activeAllocationIds, primaryId.getId());
logger.info("--> initializingIds {} Target {}", initializingIds, targetAllocationId);

final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
for (final AllocationId initializingId : initializingIds) {
boolean primaryRelocationTarget = initializingId.equals(targetAllocationId);
builder.addShard(
TestShardRouting.newShardRouting(
shardId,
nodeIdFromAllocationId(initializingId),
null,
primaryRelocationTarget,
ShardRoutingState.INITIALIZING,
initializingId
)
);
}
builder.addShard(
TestShardRouting.newShardRouting(
shardId,
nodeIdFromAllocationId(primaryId),
relocatingToNodeId,
true,
ShardRoutingState.STARTED,
primaryId
)
);
IndexShardRoutingTable routingTable = builder.build();
final ReplicationTracker tracker = newTracker(primaryId, settings);
tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));

final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 5L, "abcd", Version.LATEST);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
5L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1)
);
tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.startReplicationLagTimers(initialCheckpoint);

final Set<String> expectedIds = ids(initializingIds);

Set<SegmentReplicationShardStats> groupStats = tracker.getSegmentReplicationStats();
assertEquals(expectedIds.size() - 1, groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(1, shardStat.getCheckpointsBehindCount());
assertEquals(5L, shardStat.getBytesBehindCount());
assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis());
}

// simulate replicas moved up to date.
// final Map<String, ReplicationTracker.CheckpointState> checkpoints = tracker.checkpoints;
// for (String id : expectedIds) {
// final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id);
// assertEquals(3, checkpointState.checkpointTimers.size());
// tracker.updateVisibleCheckpointForShard(id, initialCheckpoint);
// assertEquals(2, checkpointState.checkpointTimers.size());
// }
}

public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() {
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final long initialClusterStateVersion = randomNonNegativeLong();
Expand Down

0 comments on commit 430360e

Please sign in to comment.