Skip to content

Commit

Permalink
Add comments and java doc
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 23, 2023
1 parent 1da1727 commit 17b9dcd
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,6 @@ public void testStatsCorrectnessOnFailover() {
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
// assertBusy(() -> assertEquals(ClusterHealthStatus.GREEN, ensureYellow(TimeValue.timeValueSeconds(1), INDEX_NAME)), 120,
// TimeUnit.SECONDS);
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() {
);
}

@Override
public String toString() {
return "RemoteTranslogTransferStats{"
+ "lastSuccessfulUploadTimestamp="
+ lastSuccessfulUploadTimestamp.get()
+ ","
+ "totalUploadsStarted="
+ totalUploadsStarted.get()
+ ","
+ "totalUploadsSucceeded="
+ totalUploadsSucceeded.get()
+ ","
+ "totalUploadsFailed="
+ totalUploadsFailed.get()
+ ","
+ "uploadBytesStarted="
+ uploadBytesStarted.get()
+ ","
+ "uploadBytesFailed="
+ uploadBytesFailed.get()
+ ","
+ "totalUploadTimeInMillis="
+ totalUploadTimeInMillis.get()
+ ","
+ "uploadBytesMovingAverage="
+ uploadBytesMovingAverageReference.get().getAverage()
+ ","
+ "uploadBytesPerSecMovingAverage="
+ uploadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "uploadTimeMovingAverage="
+ uploadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "lastSuccessfulDownloadTimestamp="
+ lastSuccessfulDownloadTimestamp.get()
+ ","
+ "totalDownloadsSucceeded="
+ totalDownloadsSucceeded.get()
+ ","
+ "downloadBytesSucceeded="
+ downloadBytesSucceeded.get()
+ ","
+ "totalDownloadTimeInMillis="
+ totalDownloadTimeInMillis.get()
+ ","
+ "downloadBytesMovingAverage="
+ downloadBytesMovingAverageReference.get().getAverage()
+ ","
+ "downloadBytesPerSecMovingAverage="
+ downloadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "downloadTimeMovingAverage="
+ downloadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "}";
}

/**
* Represents the tracker's state as seen in the stats API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down Expand Up @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
logger.trace(
"syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}",
syncSegmentSuccess,
(System.currentTimeMillis() - startTimeMs)
);
store.decRef();
remoteStore.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public void beforeRefresh() throws IOException {}

@Override
protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
// ready state.
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try {
Expand Down Expand Up @@ -156,11 +159,15 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
}

private boolean shouldSync(boolean didRefresh) {
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
// is important to upload the zero state segments so that the restore does not break.
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
// If the readers change, didRefresh is always true.
|| didRefresh
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
// reference, but it is important to upload the zero state segments so that the restore does not break.
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
}

Expand Down Expand Up @@ -315,6 +322,10 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

/**
* Returns if the current refresh has happened after a commit.
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
*/
private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
Expand Down Expand Up @@ -440,17 +451,30 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB
}
}

/**
* On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that
* has been uploaded to remote store successfully. This method also updates the segment tracker about the latest
* uploaded segment files onto remote store.
*/
private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();

// During failover, the uploaded metadata would have names of files that have been uploaded to remote store.
// Here we update the tracker with latest remote uploaded files.
if (uploadedMetadata != null) {
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
}
}
}

/**
* This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the
* returned value of this method for scheduling retries in syncSegments method.
* @return true iff primaryMode is true and index shard is not in closed state.
*/
private boolean isReadyForUpload() {
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
if (isReady == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
remoteTranslogTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
Expand All @@ -173,15 +174,20 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
*/
IOException ex = null;
for (int i = 0; i <= DOWNLOAD_RETRIES; i++) {
boolean success = false;
long startTimeMs = System.currentTimeMillis();
try {
downloadOnce(translogTransferManager, location, logger);
success = true;
return;
} catch (FileNotFoundException | NoSuchFileException e) {
// continue till download retries
ex = e;
} finally {
logger.trace("downloadOnce success={} timeElapsed={}", success, (System.currentTimeMillis() - startTimeMs));
}
}
logger.debug("Exhausted all download retries during translog/checkpoint file download");
logger.info("Exhausted all download retries during translog/checkpoint file download");
throw ex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,13 +950,6 @@ public ClusterHealthStatus ensureYellow(String... indices) {
return ensureColor(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30), false, indices);
}

/**
* Ensures the cluster has a yellow state via the cluster health API.
*/
public ClusterHealthStatus ensureYellow(TimeValue timeout, String... indices) {
return ensureColor(ClusterHealthStatus.YELLOW, timeout, false, indices);
}

/**
* Ensures the cluster has a red state via the cluster health API.
*/
Expand Down

0 comments on commit 17b9dcd

Please sign in to comment.