Skip to content

Commit

Permalink
Skip upload of segments_N file
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 18, 2023
1 parent 8e95a82 commit 8249e98
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 67 deletions.
88 changes: 30 additions & 58 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -2335,7 +2333,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false, true);
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
Expand Down Expand Up @@ -4596,7 +4594,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false, true);
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4656,24 +4654,25 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init();

assert remoteSegmentMetadata != null : "RemoteSegmentMetadata should not be null";

Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteDirectory
.getSegmentsUploadedToRemoteStore()
.entrySet()
.stream()
// if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes
// locally.
.filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false)
.filter(entry -> entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
store.incRef();
remoteStore.incRef();
Expand All @@ -4693,24 +4692,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
commitSegmentInfos(remoteSegmentMetadata);
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
Expand Down Expand Up @@ -4740,36 +4722,14 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
remoteDirectory.init();
remoteStore.incRef();
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.initializeToSpecificCommit(primaryTerm, commitGeneration)
.getMetadata();
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificCommit(primaryTerm, commitGeneration);
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentMetadata.getMetadata();
final Directory storeDirectory = store.directory();
store.incRef();

try {
String segmentsNFile = copySegmentFiles(
storeDirectory,
sourceRemoteDirectory,
remoteDirectory,
uploadedSegments,
overrideLocal
);
if (segmentsNFile != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
if (remoteStore != null) {
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
store.directory().sync(infosSnapshot.files(true));
store.directory().syncMetaData();
}
}
}
copySegmentFiles(storeDirectory, sourceRemoteDirectory, remoteDirectory, uploadedSegments, overrideLocal);
commitSegmentInfos(remoteSegmentMetadata);
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
Expand All @@ -4780,7 +4740,25 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
}
}

private String copySegmentFiles(
private void commitSegmentInfos(RemoteSegmentMetadata remoteSegmentMetadata) throws IOException {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}

private void copySegmentFiles(
Directory storeDirectory,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Expand All @@ -4789,7 +4767,6 @@ private String copySegmentFiles(
) throws IOException {
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
String segmentNFile = null;
try {
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
if (overrideLocal) {
Expand All @@ -4808,16 +4785,11 @@ private String copySegmentFiles(
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
}
if (file.startsWith(IndexFileNames.SEGMENTS)) {
assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file";
segmentNFile = file;
}
}
} finally {
logger.trace("Downloaded segments here: {}", downloadedSegments);
logger.trace("Skipped download for segments here: {}", skippedSegments);
}
return segmentNFile;
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private boolean syncSegments() {
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);
Collection<String> localSegmentsPostRefresh = segmentInfos.files(false);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);
indexShard.syncSegmentsFromRemoteSegmentStore(true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, true);
indexShard.syncSegmentsFromRemoteSegmentStore(false);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private void assertNoLag(RemoteSegmentTransferTracker tracker) {
assertEquals(0, tracker.getTimeMsLag());
assertEquals(0, tracker.getRejectionCount());
assertEquals(tracker.getUploadBytesStarted(), tracker.getUploadBytesSucceeded());
assertTrue(tracker.getUploadBytesStarted() > 0);
assertEquals(0, tracker.getUploadBytesStarted());
assertEquals(0, tracker.getUploadBytesFailed());
assertEquals(0, tracker.getInflightUploads());
assertEquals(tracker.getTotalUploadsStarted(), tracker.getTotalUploadsSucceeded());
Expand Down Expand Up @@ -557,16 +557,12 @@ public TestFilterDirectory(Directory in) {
private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException {
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
String segmentsNFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
for (String file : segmentInfos.files(true)) {
if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) {
if (RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file) == false && file.startsWith(IndexFileNames.SEGMENTS) == false) {
assertTrue(uploadedSegments.containsKey(file));
}
if (file.startsWith(IndexFileNames.SEGMENTS)) {
segmentsNFilename = file;
}
}
}
}
Expand Down

0 comments on commit 8249e98

Please sign in to comment.