Skip to content

Commit

Permalink
[Remote Segment Store] Add Lucene major version to UploadedSegmentMet…
Browse files Browse the repository at this point in the history
…adata (opensearch-project#8088)

* Add Lucene version to UploadedSegmentMetadata

---------

Signed-off-by: Bhumika Saini <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
Bhumika Saini authored and kaushalmahi12 committed Sep 12, 2023
1 parent 95d2a0a commit cfac7bd
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,25 @@ public static long invertLong(String str) {
}
return Long.MAX_VALUE - num;
}

/**
* Extracts the segment name from the provided segment file name
* @param filename Segment file name to parse
* @return Name of the segment that the segment file belongs to
*/
public static String getSegmentName(String filename) {
// Segment file names follow patterns like "_0.cfe" or "_0_1_Lucene90_0.dvm".
// Here, the segment name is "_0", which is the set of characters
// starting with "_" until the next "_" or first ".".
int endIdx = filename.indexOf('_', 1);
if (endIdx == -1) {
endIdx = filename.indexOf('.');
}

if (endIdx == -1) {
throw new IllegalArgumentException("Unable to infer segment name for segment file " + filename);
}

return filename.substring(0, endIdx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
Expand All @@ -22,6 +24,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -217,6 +220,14 @@ public static class UploadedSegmentMetadata {
private final String checksum;
private final long length;

/**
* The Lucene major version that wrote the original segment files.
* As part of the Lucene version compatibility check, this version information stored in the metadata
* will be used to skip downloading the segment files unnecessarily
* if they were written by an incompatible Lucene version.
*/
private int writtenByMajor;

UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum, long length) {
this.originalFilename = originalFilename;
this.uploadedFilename = uploadedFilename;
Expand All @@ -226,7 +237,14 @@ public static class UploadedSegmentMetadata {

@Override
public String toString() {
return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum, String.valueOf(length));
return String.join(
SEPARATOR,
originalFilename,
uploadedFilename,
checksum,
String.valueOf(length),
String.valueOf(writtenByMajor)
);
}

public String getChecksum() {
Expand All @@ -239,12 +257,35 @@ public long getLength() {

public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
if (values.length < 5) {
logger.error("Lucene version is missing for UploadedSegmentMetadata: " + uploadedFilename);
}

metadata.setWrittenByMajor(Integer.parseInt(values[4]));

return metadata;
}

public String getOriginalFilename() {
return originalFilename;
}

public void setWrittenByMajor(int writtenByMajor) {
if (writtenByMajor <= Version.LATEST.major && writtenByMajor >= Version.MIN_SUPPORTED_MAJOR) {
this.writtenByMajor = writtenByMajor;
} else {
throw new IllegalArgumentException(
"Lucene major version supplied ("
+ writtenByMajor
+ ") is incorrect. Should be between Version.LATEST ("
+ Version.LATEST.major
+ ") and Version.MIN_SUPPORTED_MAJOR ("
+ Version.MIN_SUPPORTED_MAJOR
+ ")."
);
}
}
}

/**
Expand Down Expand Up @@ -582,10 +623,13 @@ public void uploadMetadata(
);
try {
try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) {
Map<String, Integer> segmentToLuceneVersion = getSegmentToLuceneVersion(segmentFiles, segmentInfosSnapshot);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
UploadedSegmentMetadata metadata = segmentsUploadedToRemoteStore.get(file);
metadata.setWrittenByMajor(segmentToLuceneVersion.get(metadata.originalFilename));
uploadedSegments.put(file, metadata.toString());
} else {
throw new NoSuchFileException(file);
}
Expand Down Expand Up @@ -615,6 +659,38 @@ public void uploadMetadata(
}
}

/**
* Parses the provided SegmentInfos to retrieve a mapping of the provided segment files to
* the respective Lucene major version that wrote the segments
* @param segmentFiles List of segment files for which the Lucene major version is needed
* @param segmentInfosSnapshot SegmentInfos instance to parse
* @return Map of the segment file to its Lucene major version
*/
private Map<String, Integer> getSegmentToLuceneVersion(Collection<String> segmentFiles, SegmentInfos segmentInfosSnapshot) {
Map<String, Integer> segmentToLuceneVersion = new HashMap<>();
for (SegmentCommitInfo segmentCommitInfo : segmentInfosSnapshot) {
SegmentInfo info = segmentCommitInfo.info;
Set<String> segFiles = info.files();
for (String file : segFiles) {
segmentToLuceneVersion.put(file, info.getVersion().major);
}
}

for (String file : segmentFiles) {
if (segmentToLuceneVersion.containsKey(file) == false) {
if (file.equals(segmentInfosSnapshot.getSegmentsFileName())) {
segmentToLuceneVersion.put(file, segmentInfosSnapshot.getCommitLuceneVersion().major);
} else {
// Fallback to the Lucene major version of the respective segment's .si file
String segmentInfoFileName = RemoteStoreUtils.getSegmentName(file) + ".si";
segmentToLuceneVersion.put(file, segmentToLuceneVersion.get(segmentInfoFileName));
}
}
}

return segmentToLuceneVersion;
}

/**
* Try to delete file from local store. Fails silently on failures
* @param filename: name of the file to be deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,26 @@ public void testinvert() {
assertEquals(num, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(num)));
}
}

public void testGetSegmentNameForCfeFile() {
assertEquals("_foo", RemoteStoreUtils.getSegmentName("_foo.cfe"));
}

public void testGetSegmentNameForDvmFile() {
assertEquals("_bar", RemoteStoreUtils.getSegmentName("_bar_1_Lucene90_0.dvm"));
}

public void testGetSegmentNameWeirdSegmentNameOnlyUnderscore() {
// Validate behaviour when segment name contains delimiters only
assertEquals("_", RemoteStoreUtils.getSegmentName("_.dvm"));
}

public void testGetSegmentNameUnderscoreDelimiterOverrides() {
// Validate behaviour when segment name contains delimiters only
assertEquals("_", RemoteStoreUtils.getSegmentName("___.dvm"));
}

public void testGetSegmentNameException() {
assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.getSegmentName("dvd"));
}
}
Loading

0 comments on commit cfac7bd

Please sign in to comment.