Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix flaky test SegmentReplicationRemoteStoreIT.testPressureServiceStats #8855

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -876,69 +876,84 @@ public void testPressureServiceStats() throws Exception {
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);

// get shard references.
final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
logger.info("Replica aid {}", replicaShard.routingEntry().allocationId());
logger.info("former primary aid {}", primaryShard.routingEntry().allocationId());

// fetch pressure stats from the Primary's Node.
SegmentReplicationPressureService pressureService = internalCluster().getInstance(
SegmentReplicationPressureService.class,
primaryNode
);

final Map<ShardId, SegmentReplicationPerGroupStats> shardStats = pressureService.nodeStats().getShardStats();
assertEquals(1, shardStats.size());
final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId());
Set<SegmentReplicationShardStats> replicaStats = groupStats.getReplicaStats();
assertEquals(1, replicaStats.size());

// assert replica node returns nothing.
// Fetch pressure stats from the Replica's Node we will assert replica node returns nothing until it is promoted.
SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance(
SegmentReplicationPressureService.class,
replicaNode
);

final Map<ShardId, SegmentReplicationPerGroupStats> shardStats = pressureService.nodeStats().getShardStats();
assertEquals("We should have stats returned for the replication group", 1, shardStats.size());

SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId());
Set<SegmentReplicationShardStats> replicaStats = groupStats.getReplicaStats();
assertAllocationIdsInReplicaShardStats(Set.of(replicaShard.routingEntry().allocationId().getId()), replicaStats);

assertTrue(replicaNode_service.nodeStats().getShardStats().isEmpty());

// drop the primary, this won't hand off SR state.
// drop the primary, this won't hand off pressure stats between old/new primary.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
ensureYellowAndNoInitializingShards(INDEX_NAME);
replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode);
replica = getIndexShard(replicaNode, INDEX_NAME);
assertTrue("replica should be promoted as a primary", replica.routingEntry().primary());
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
// we don't have a replica assigned yet, so this should be 0.
assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size());

assertTrue("replica should be promoted as a primary", replicaShard.routingEntry().primary());
assertEquals(
"We should have stats returned for the replication group",
1,
replicaNode_service.nodeStats().getShardStats().size()
);
// after the primary is dropped and replica is promoted we won't have a replica assigned yet, so stats per replica should return
// empty.
replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats();
assertTrue(replicaStats.isEmpty());

// start another replica.
String replicaNode_2 = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
String docId = String.valueOf(initialDocCount + 1);
client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get();
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount + 1, replicaNode_2);
final IndexShard secondReplicaShard = getIndexShard(replicaNode_2, INDEX_NAME);
final String second_replica_aid = secondReplicaShard.routingEntry().allocationId().getId();
waitForSearchableDocs(initialDocCount, replicaNode_2);

replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode);
replica = getIndexShard(replicaNode_2, INDEX_NAME);
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats();
assertEquals(1, replicaStats.size());
assertEquals(
"We should have stats returned for the replication group",
1,
replicaNode_service.nodeStats().getShardStats().size()
);
replicaStats = replicaNode_service.nodeStats().getShardStats().get(replicaShard.shardId()).getReplicaStats();
assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), replicaStats);
final SegmentReplicationShardStats replica_entry = replicaStats.stream().findFirst().get();
assertEquals(replica_entry.getCheckpointsBehindCount(), 0);

// test a checkpoint without any new segments
flush(INDEX_NAME);
assertBusy(() -> {
final SegmentReplicationPressureService service = internalCluster().getInstance(
SegmentReplicationPressureService.class,
replicaNode
);
assertEquals(1, service.nodeStats().getShardStats().size());
final Set<SegmentReplicationShardStats> shardStatsSet = service.nodeStats()
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
final Set<SegmentReplicationShardStats> shardStatsSet = replicaNode_service.nodeStats()
.getShardStats()
.get(primaryShard.shardId())
.get(replicaShard.shardId())
.getReplicaStats();
assertEquals(1, shardStatsSet.size());
assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), shardStatsSet);
final SegmentReplicationShardStats stats = shardStatsSet.stream().findFirst().get();
assertEquals(0, stats.getCheckpointsBehindCount());
});
}
}

private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<SegmentReplicationShardStats> replicaStats) {
assertEquals(expected, replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet()));
}

/**
* Tests a scroll query on the replica
* @throws Exception when issue is encountered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -528,13 +532,27 @@ public void testFlushAfterRelocation() throws Exception {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

// Verify segment replication event never happened on replica shard
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

// Verify segment replication event never happened on replica shard other than recovery.
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertTrue(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0).getReplicaStats().isEmpty());
final Set<SegmentReplicationShardStats> replicaStats = segmentReplicationStatsResponse.getReplicationStats()
.get(INDEX_NAME)
.get(0)
.getReplicaStats();
assertEquals(
Set.of(replicaShard.routingEntry().allocationId().getId()),
replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet())
);
// the primary still has not refreshed to update its checkpoint, so our replica is not yet behind.
assertEquals(0, replicaStats.stream().findFirst().get().getCheckpointsBehindCount());

// Relocate primary to new primary. When new primary starts it does perform a flush.
logger.info("--> relocate the shard from primary to newPrimary");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L

private final Consumer<ReplicationGroup> onReplicationGroupUpdated;

private volatile ReplicationCheckpoint lastPublishedReplicationCheckpoint;
private volatile ReplicationCheckpoint latestReplicationCheckpoint;

/**
* Get all retention leases tracked on this shard.
Expand Down Expand Up @@ -1056,6 +1056,7 @@ public ReplicationTracker(
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
this.onReplicationGroupUpdated = onReplicationGroupUpdated;
this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down Expand Up @@ -1214,26 +1215,42 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
assert handoffInProgress == false;
if (checkpoint.equals(lastPublishedReplicationCheckpoint) == false) {
this.lastPublishedReplicationCheckpoint = checkpoint;
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
if (entry.getKey().equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
if (cps.inSync) {
cps.checkpointTimers.computeIfAbsent(checkpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint to {} - timers [{}]",
checkpoint,
cps.checkpointTimers.keySet()
)
);
}
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
startReplicationLagTimers();
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private void startReplicationLagTimers() {
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
// if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state.
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint for {} at visible cp {} to {} - timers [{}]",
allocationId,
cps.visibleReplicationCheckpoint,
latestReplicationCheckpoint,
cps.checkpointTimers.keySet()
)
);
}
}
}
Expand All @@ -1246,12 +1263,17 @@ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint ch
*/
public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
assert indexSettings.isSegRepEnabled();
final ReplicationCheckpoint lastPublishedCheckpoint = this.lastPublishedReplicationCheckpoint;
if (primaryMode && lastPublishedCheckpoint != null) {
if (primaryMode) {
return this.checkpoints.entrySet()
.stream()
.filter(entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync)
.map(entry -> buildShardStats(lastPublishedCheckpoint.getLength(), entry.getKey(), entry.getValue()))
// filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not
// been assigned to a node).
.filter(
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
)
.map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
}
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long length;
private final String codec;

public static ReplicationCheckpoint empty(ShardId shardId) {
return empty(shardId, "");
}

public static ReplicationCheckpoint empty(ShardId shardId, String codec) {
return new ReplicationCheckpoint(shardId, codec);
}
Expand Down
Loading