Skip to content

Commit

Permalink
Fallback to generation from segmentInfosSnapshot if metadata is not f…
Browse files Browse the repository at this point in the history
…ound in remote store

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Oct 3, 2023
1 parent 7dc6683 commit 20ae393
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,20 @@ private boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
SegmentInfos lastCommittedSegmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
if (segmentInfos.getGeneration() != lastCommittedSegmentInfos.getGeneration()) {
logger.info("--> Different generations. segmentInfosSnapshot = {}, lastCommittedSegmentInfos = {}",
segmentInfos.getGeneration(), lastCommittedSegmentInfos.getGeneration());
assert segmentInfos.files(false).equals(lastCommittedSegmentInfos.files(false)) :"SegmentInfos files: "
+ segmentInfos.files(false)
+ " do not match lastCommittedSegmentInfos files: "
+ lastCommittedSegmentInfos.files(false);
}
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();

// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ public void uploadMetadata(
}
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
logger.debug("Uploaded metadata file: " + metadataFilename);
} finally {
tryAndDeleteLocalFile(metadataFilename, storeDirectory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -406,7 +408,23 @@ private void snapshot(
long primaryTerm = indexShard.getOperationPrimaryTerm();
final IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
try {
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch(NoSuchFileException e) {
// Explain why we are doing this
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
if (segmentInfos.getGeneration() != commitGeneration) {
SegmentInfos directoryInfos = indexShard.store().readLastCommittedSegmentsInfo();
if(segmentInfos.files(false).equals(directoryInfos.files(false))) {
logger.info("Different generations for lastIndexCommit = {} and segmentInfosSnapshot = {}. As referred files are same, using generation from segmentInfosSnapshot", commitGeneration, segmentInfos.getGeneration());
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, segmentInfos.getGeneration());
} else {
throw(e);
}
}
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
Expand Down

0 comments on commit 20ae393

Please sign in to comment.