Skip to content

Commit

Permalink
Wiring of snapshot restore flow with remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 20, 2024
1 parent b72b9f2 commit 4790fbf
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
41 changes: 33 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2479,6 +2479,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
openEngineAndRecoverFromTranslog(true);
}

public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
Expand All @@ -2499,7 +2503,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
loadGlobalCheckpointToReplicationTracker();
}

innerOpenEngineAndTranslog(replicationTracker);
innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);
getEngine().translogManager()
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}
Expand Down Expand Up @@ -2561,7 +2565,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
} else if (routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT && ((SnapshotRecoverySource)routingEntry().recoverySource()).getPinnedTimestamp() <= 0){
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
Expand Down Expand Up @@ -5002,6 +5006,10 @@ public void deleteRemoteStoreContents() throws IOException {
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
syncTranslogFilesFromRemoteTranslog(-1);
}

public void syncTranslogFilesFromRemoteTranslog(long timestamp) throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
Expand All @@ -5014,7 +5022,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings().isTranslogMetadataEnabled()
indexSettings().isTranslogMetadataEnabled(),
timestamp
);
}

Expand Down Expand Up @@ -5103,15 +5112,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
* Downloads segments from given remote segment store for a specific commit.
* @param overrideLocal flag to override local segment files with those in remote store
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromGivenRemoteSegmentStore(
boolean overrideLocal,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
long primaryTerm,
long commitGeneration
RemoteSegmentMetadata remoteSegmentMetadata,
boolean pinnedTimestamp
) throws IOException {
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
Expand All @@ -5134,12 +5141,30 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
if (pinnedTimestamp) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
else if (segmentsNFile != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
)
) {
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(segmentsNFile);
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
if (remoteStore != null) {
Expand Down
21 changes: 14 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogHeader;
Expand Down Expand Up @@ -407,12 +408,12 @@ void recoverFromSnapshotAndRemoteStore(
shardId,
shallowCopyShardMetadata.getRemoteStorePathStrategy()
);
sourceRemoteDirectory.initializeToSpecificCommit(
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
commitGeneration,
recoverySource.snapshot().getSnapshotId().getUUID()
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, false);
final Store store = indexShard.store();
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
bootstrap(indexShard, store);
Expand Down Expand Up @@ -479,15 +480,21 @@ void recoverShallowSnapshotV2(
shardId,
RemoteStoreUtils.determineRemoteStorePathStrategy(prevIndexMetadata)
);
// ToDo : Use segments and translog from pinned timestamp
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, 0, 1);
indexShard.syncTranslogFilesFromRemoteTranslog();
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificTimestamp(recoverySource.getPinnedTimestamp());

indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, true);
indexShard.syncTranslogFilesFromRemoteTranslog(recoverySource.getPinnedTimestamp());

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
indexShard.openEngineAndRecoverFromTranslog();
indexShard.openEngineAndRecoverFromTranslog(false);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
indexShard.waitForRemoteStoreSync();
}
indexShard.postRecovery("post recovery from remote_store");
listener.onResponse(true);
}
, listener::onFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public RemoteFsTranslog(
isTranslogMetadataEnabled
);
try {
download(translogTransferManager, location, logger, config.shouldSeedRemote(), timestamp);
//download(translogTransferManager, location, logger, config.shouldSeedRemote(), timestamp);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down

0 comments on commit 4790fbf

Please sign in to comment.