Skip to content

Commit

Permalink
[Remote Store] Use time elapsed since last successful local refresh f…
Browse files Browse the repository at this point in the history
…or refresh lag (#10803)

* [Remote Store] Use time elapsed since last successful local refresh for time lag

Signed-off-by: Ashish Singh <[email protected]>

* Incorporate PR review comments

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Oct 21, 2023
1 parent 7c5a806 commit 14d4a63
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testWritesRejectedDueToBytesLagBreach() throws Exception {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
}

private void validateBackpressure(
Expand Down Expand Up @@ -133,11 +133,13 @@ private RemoteSegmentTransferTracker.Stats stats() {
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
private void indexDocAndRefresh(BytesReference source, int iterations) throws InterruptedException {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
refresh(INDEX_NAME);
}
Thread.sleep(250);
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -66,6 +67,12 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long remoteRefreshTimeMs;

/**
* This is the time of first local refresh after the last successful remote refresh. When the remote store is in
* sync with local refresh, this will be reset to -1.
*/
private volatile long remoteRefreshStartTimeMs = -1;

/**
* The refresh time(clock) of most recent remote refresh.
*/
Expand All @@ -76,11 +83,6 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long refreshSeqNoLag;

/**
* Keeps the time (ms) lag computed so that we do not compute it for every request.
*/
private volatile long timeMsLag;

/**
* Keeps track of the total bytes of segment files which were uploaded to remote store during last successful remote refresh
*/
Expand Down Expand Up @@ -132,14 +134,19 @@ public RemoteSegmentTransferTracker(
logger = Loggers.getLogger(getClass(), shardId);
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long currentTimeMs = currentTimeMsUsingSystemNanos();
localRefreshTimeMs = currentTimeMs;
remoteRefreshTimeMs = currentTimeMs;
remoteRefreshStartTimeMs = currentTimeMs;
localRefreshClockTimeMs = currentClockTimeMs;
remoteRefreshClockTimeMs = currentClockTimeMs;
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

public static long currentTimeMsUsingSystemNanos() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}

@Override
public void incrementTotalUploadsFailed() {
super.incrementTotalUploadsFailed();
Expand Down Expand Up @@ -180,19 +187,22 @@ public long getLocalRefreshClockTimeMs() {
*/
public void updateLocalRefreshTimeAndSeqNo() {
updateLocalRefreshClockTimeMs(System.currentTimeMillis());
updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1);
}

// Visible for testing
void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
synchronized void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ " < "
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
boolean isRemoteInSyncBeforeLocalRefresh = this.localRefreshTimeMs == this.remoteRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
computeTimeMsLag();
if (isRemoteInSyncBeforeLocalRefresh) {
this.remoteRefreshStartTimeMs = localRefreshTimeMs;
}
}

private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
Expand Down Expand Up @@ -221,14 +231,18 @@ long getRemoteRefreshClockTimeMs() {
return remoteRefreshClockTimeMs;
}

public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
public synchronized void updateRemoteRefreshTimeMs(long refreshTimeMs) {
assert refreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ refreshTimeMs
+ " < "
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
computeTimeMsLag();
this.remoteRefreshTimeMs = refreshTimeMs;
// When multiple refreshes have failed, there is a possibility that retry is ongoing while another refresh gets
// triggered. After the segments have been uploaded and before the below code runs, the updateLocalRefreshTimeAndSeqNo
// method is triggered, which will update the local localRefreshTimeMs. Now, the lag would basically become the
// time since the last refresh happened locally.
this.remoteRefreshStartTimeMs = refreshTimeMs == this.localRefreshTimeMs ? -1 : this.localRefreshTimeMs;
}

public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) {
Expand All @@ -243,12 +257,11 @@ public long getRefreshSeqNoLag() {
return refreshSeqNoLag;
}

private void computeTimeMsLag() {
timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs;
}

public long getTimeMsLag() {
return timeMsLag;
if (remoteRefreshTimeMs == localRefreshTimeMs) {
return 0;
}
return currentTimeMsUsingSystemNanos() - remoteRefreshStartTimeMs;
}

public long getBytesLag() {
Expand Down Expand Up @@ -354,7 +367,7 @@ public RemoteSegmentTransferTracker.Stats stats() {
shardId,
localRefreshClockTimeMs,
remoteRefreshClockTimeMs,
timeMsLag,
getTimeMsLag(),
localRefreshSeqNo,
remoteRefreshSeqNo,
uploadBytesStarted.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId sh
return true;
}
if (pressureTracker.isUploadTimeMovingAverageReady() == false) {
logger.trace("upload time moving average is not ready");
return true;
}
long timeLag = pressureTracker.getTimeMsLag();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;

public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase {
private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;
private ClusterService clusterService;
Expand Down Expand Up @@ -92,7 +94,7 @@ public void testUpdateLocalRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateLocalRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getLocalRefreshTimeMs());
}
Expand All @@ -103,7 +105,7 @@ public void testUpdateRemoteRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateRemoteRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getRemoteRefreshTimeMs());
}
Expand Down Expand Up @@ -133,20 +135,29 @@ public void testComputeSeqNoLagOnUpdate() {
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, transferTracker.getRefreshSeqNoLag());
}

public void testComputeTimeLagOnUpdate() {
public void testComputeTimeLagOnUpdate() throws InterruptedException {
transferTracker = new RemoteSegmentTransferTracker(
shardId,
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long currentLocalRefreshTimeMs = transferTracker.getLocalRefreshTimeMs();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500);
long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99);
transferTracker.updateLocalRefreshTimeMs(localRefreshTimeMs);
assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, transferTracker.getTimeMsLag());
transferTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs);
assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, transferTracker.getTimeMsLag());

// No lag if there is a remote upload corresponding to a local refresh
assertEquals(0, transferTracker.getTimeMsLag());

// Set a local refresh time that is higher than remote refresh time
Thread.sleep(1);
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());

// Sleep for 100ms and then the lag should be within 100ms +/- 20ms
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);

transferTracker.updateRemoteRefreshTimeMs(transferTracker.getLocalRefreshTimeMs());
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
long random = randomIntBetween(50, 200);
Thread.sleep(random);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - random) <= 20);
}

public void testAddUploadBytesStarted() {
Expand Down Expand Up @@ -519,7 +530,7 @@ public void testStatsObjectCreation() {
transferTracker = constructTracker();
RemoteSegmentTransferTracker.Stats transferTrackerStats = transferTracker.stats();
assertEquals(transferTracker.getShardId(), transferTrackerStats.shardId);
assertEquals(transferTracker.getTimeMsLag(), (int) transferTrackerStats.refreshTimeLagMs);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - transferTrackerStats.refreshTimeLagMs) <= 20);
assertEquals(transferTracker.getLocalRefreshSeqNo(), (int) transferTrackerStats.localRefreshNumber);
assertEquals(transferTracker.getRemoteRefreshSeqNo(), (int) transferTrackerStats.remoteRefreshNumber);
assertEquals(transferTracker.getBytesLag(), (int) transferTrackerStats.bytesLag);
Expand Down Expand Up @@ -591,9 +602,9 @@ private RemoteSegmentTransferTracker constructTracker() {
);
transferTracker.incrementTotalUploadsStarted();
transferTracker.incrementTotalUploadsFailed();
transferTracker.updateUploadTimeMovingAverage(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateUploadTimeMovingAverage(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.updateUploadBytesMovingAverage(99);
transferTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateRemoteRefreshTimeMs(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.incrementRejectionCount();
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10);
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;
import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard;

public class RemoteStorePressureServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -68,7 +71,7 @@ public void testIsSegmentsUploadBackpressureEnabled() {
assertTrue(pressureService.isSegmentsUploadBackpressureEnabled());
}

public void testValidateSegmentUploadLag() {
public void testValidateSegmentUploadLag() throws InterruptedException {
// Create the pressure tracker
IndexShard indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
Expand All @@ -86,14 +89,27 @@ public void testValidateSegmentUploadLag() {
sum.addAndGet(i);
});
double avg = (double) sum.get() / 20;
long currentMs = System.nanoTime() / 1_000_000;
pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg));
pressureTracker.updateRemoteRefreshTimeMs(currentMs);
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms"));

pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg));
// We run this to ensure that the local and remote refresh time are not same anymore
while (pressureTracker.getLocalRefreshTimeMs() == currentTimeMsUsingSystemNanos()) {
Thread.sleep(10);
}
long localRefreshTimeMs = currentTimeMsUsingSystemNanos();
pressureTracker.updateLocalRefreshTimeMs(localRefreshTimeMs);

while (currentTimeMsUsingSystemNanos() - localRefreshTimeMs <= 20 * avg) {
Thread.sleep((long) (4 * avg));
}
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
String regex = "^rejected execution on primary shard:\\[index]\\[0] due to remote segments lagging behind "
+ "local segments.time_lag:[0-9]{2,3} ms dynamic_time_lag_threshold:95\\.0 ms$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(e.getMessage());
assertTrue(matcher.matches());

pressureTracker.updateRemoteRefreshTimeMs(pressureTracker.getLocalRefreshTimeMs());
pressureTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
Thread.sleep((long) (2 * avg));
pressureService.validateSegmentsUploadLag(shardId);

// 2. bytes lag more than dynamic threshold
Expand Down

0 comments on commit 14d4a63

Please sign in to comment.