diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index e8d60523ea0e1..cb5e2e911705b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -663,6 +663,29 @@ public void testStatsCorrectnessOnFailover() { logger.info("Test completed"); } + public void testZeroLagOnCreateIndex() throws InterruptedException { + setup(); + String clusterManagerNode = internalCluster().getClusterManagerName(); + + int numOfShards = randomIntBetween(1, 3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, numOfShards)); + ensureGreen(INDEX_NAME); + long currentTimeNs = System.nanoTime(); + while (currentTimeNs == System.nanoTime()) { + Thread.sleep(10); + } + + for (int i = 0; i < numOfShards; i++) { + RemoteStoreStatsResponse response = client(clusterManagerNode).admin() + .cluster() + .prepareRemoteStoreStats(INDEX_NAME, String.valueOf(i)) + .get(); + for (RemoteStoreStats remoteStoreStats : response.getRemoteStoreStats()) { + assertEquals(0, remoteStoreStats.getSegmentStats().refreshTimeLagMs); + } + } + } + private void indexDocs() { for (int i = 0; i < randomIntBetween(5, 10); i++) { if (randomBoolean()) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8b0088a8e1127..3df07495f5b80 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -86,7 +86,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL private final RemoteSegmentStoreDirectory remoteDirectory; private final RemoteSegmentTransferTracker segmentTracker; private final Map localSegmentChecksumMap; - private long primaryTerm; + private volatile long primaryTerm; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; @@ -126,10 +126,9 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) { // We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean // from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete // ready state. - if (shouldSync(didRefresh) && isReadyForUpload()) { - segmentTracker.updateLocalRefreshTimeAndSeqNo(); + if (shouldSync(didRefresh, true) && isReadyForUpload()) { try { - initializeRemoteDirectoryOnTermUpdate(); + segmentTracker.updateLocalRefreshTimeAndSeqNo(); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); updateLocalSizeMapAndTracker(localSegmentsPostRefresh); @@ -150,7 +149,7 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { boolean successful; - if (shouldSync(didRefresh)) { + if (shouldSync(didRefresh, false)) { successful = syncSegments(); } else { successful = true; @@ -158,10 +157,15 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { return successful; } - private boolean shouldSync(boolean didRefresh) { - return this.primaryTerm != indexShard.getOperationPrimaryTerm() - // If the readers change, didRefresh is always true. - || didRefresh + /** + * This checks if there is a sync required to remote. + * + * @param didRefresh if the readers changed. + * @param skipPrimaryTermCheck consider change in primary term or not for should sync + * @return true if sync is needed + */ + private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { + boolean shouldSync = didRefresh // If the readers change, didRefresh is always true. // The third condition exists for uploading the zero state segments where the refresh has not changed the reader // reference, but it is important to upload the zero state segments so that the restore does not break. || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty() @@ -169,6 +173,10 @@ private boolean shouldSync(boolean didRefresh) { // we update the primary term and the same condition would not evaluate to true again in syncSegments. // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. || isRefreshAfterCommitSafe(); + if (shouldSync || skipPrimaryTermCheck) { + return shouldSync; + } + return this.primaryTerm != indexShard.getOperationPrimaryTerm(); } private boolean syncSegments() { @@ -188,6 +196,7 @@ private boolean syncSegments() { try { try { + initializeRemoteDirectoryOnTermUpdate(); // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh.