Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Store] Fix refresh lag bug on primary term change #10935

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,29 @@ public void testStatsCorrectnessOnFailover() {
logger.info("Test completed");
}

public void testZeroLagOnCreateIndex() throws InterruptedException {
setup();
String clusterManagerNode = internalCluster().getClusterManagerName();

int numOfShards = randomIntBetween(1, 3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, numOfShards));
ensureGreen(INDEX_NAME);
long currentTimeNs = System.nanoTime();
while (currentTimeNs == System.nanoTime()) {
Thread.sleep(10);
}

for (int i = 0; i < numOfShards; i++) {
RemoteStoreStatsResponse response = client(clusterManagerNode).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, String.valueOf(i))
.get();
for (RemoteStoreStats remoteStoreStats : response.getRemoteStoreStats()) {
assertEquals(0, remoteStoreStats.getSegmentStats().refreshTimeLagMs);
}
}
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
private final RemoteSegmentStoreDirectory remoteDirectory;
private final RemoteSegmentTransferTracker segmentTracker;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;
private volatile long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

Expand Down Expand Up @@ -126,10 +126,9 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
// ready state.
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
if (shouldSync(didRefresh, true) && isReadyForUpload()) {
try {
initializeRemoteDirectoryOnTermUpdate();
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand All @@ -150,25 +149,34 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
boolean successful;
if (shouldSync(didRefresh)) {
if (shouldSync(didRefresh, false)) {
successful = syncSegments();
} else {
successful = true;
}
return successful;
}

private boolean shouldSync(boolean didRefresh) {
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
// If the readers change, didRefresh is always true.
|| didRefresh
/**
* This checks if there is a sync required to remote.
*
* @param didRefresh if the readers changed.
* @param skipPrimaryTermCheck consider change in primary term or not for should sync
* @return true if sync is needed
*/
private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
boolean shouldSync = didRefresh // If the readers change, didRefresh is always true.
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
// reference, but it is important to upload the zero state segments so that the restore does not break.
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

private boolean syncSegments() {
Expand All @@ -188,6 +196,7 @@ private boolean syncSegments() {

try {
try {
initializeRemoteDirectoryOnTermUpdate();
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
Expand Down