Skip to content

Commit

Permalink
Plug the new snapshot when needed and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi committed Nov 6, 2024
1 parent 0e02eb4 commit a96648d
Show file tree
Hide file tree
Showing 28 changed files with 1,568 additions and 1,186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
Expand All @@ -24,16 +27,20 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.codec.PerFieldMapperCodec;
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.LuceneBatchChangesSnapshot;
import org.elasticsearch.index.engine.LuceneChangesSnapshot;
import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot;
import org.elasticsearch.index.engine.SearchBasedChangesSnapshot;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
Expand Down Expand Up @@ -65,14 +72,16 @@
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import static org.elasticsearch.index.engine.Engine.ROOT_DOC_FIELD_NAME;

@Fork(value = 1)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 5, time = 3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class LuceneChangesSnapshotBenchmark {
@Param({ "default@1024", "logsdb@1024", "logsdb-batch@16", "logsdb-batch@64", "logsdb-batch@512", "logsdb-batch@1024" })
@Param({ "default", "logsdb@1kb", "logsdb@4MB" })
String mode;

@Param({ "false", "true" })
Expand All @@ -96,14 +105,20 @@ public class LuceneChangesSnapshotBenchmark {
@Setup
public void setup() throws IOException {
this.path = Files.createTempDirectory("snapshot_changes");
Settings settings = mode.startsWith("logsdb") ? Settings.builder().put("index.mode", "logsdb").build() : Settings.EMPTY;
Settings settings = mode.startsWith("logsdb")
? Settings.builder()
.put("index.mode", "logsdb")
.put(IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true)
.build()
: Settings.EMPTY;
this.mapperService = MapperServiceFactory.create(settings, readMappings(dataset));
IndexWriterConfig config = new IndexWriterConfig();
config.setCodec(
new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_COMPRESSION, mapperService, BigArrays.NON_RECYCLING_INSTANCE)
);
if (sequential == false) {
config.setIndexSort(new Sort(new SortField[] { new SortField("rand", SortField.Type.LONG) }));
config.setParentField(ROOT_DOC_FIELD_NAME);
}
try (FSDirectory dir = FSDirectory.open(path); IndexWriter writer = new IndexWriter(dir, config);) {
try (
Expand Down Expand Up @@ -140,6 +155,17 @@ public void setup() throws IOException {
DirectoryReader.open(dir),
new ShardId(mapperService.getIndexSettings().getIndex(), 0)
);
long sizeInBytes = 0;
for (LeafReaderContext readerContext : reader.leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {}
}
System.out.println("Size: " + ByteSizeValue.ofBytes(sizeInBytes));

this.searcher = new Engine.Searcher("snapshot", reader, new BM25Similarity(), null, new QueryCachingPolicy() {
@Override
public void onUse(Query query) {}
Expand Down Expand Up @@ -167,40 +193,38 @@ public void tearDown() {
@Benchmark
@OperationsPerInvocation(NUM_OPS)
public long recover() throws IOException {
long totalSize = 0;
String indexMode = mode.split("@")[0];
int batchSize = Integer.parseInt(mode.split("@")[1]);
Translog.Snapshot snapshot = switch (indexMode) {
case "default":
case "logsdb":
yield new LuceneChangesSnapshot(
Translog.Snapshot snapshot = switch (mapperService.getIndexSettings().getMode()) {
case LOGSDB:
assert indexMode.equals("logsdb");
long maxMemorySize = ByteSizeValue.parseBytesSizeValue(mode.split("@")[1], "").getBytes();
yield new LuceneSyntheticSourceChangesSnapshot(
mapperService.mappingLookup(),
searcher,
batchSize,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
maxMemorySize,
0,
NUM_OPS - 1,
true,
true,
true,
IndexVersion.current()
);

case "logsdb-batch":
yield new LuceneBatchChangesSnapshot(
mapperService.mappingLookup(),
default:
assert indexMode.equals("default");
yield new LuceneChangesSnapshot(
searcher,
batchSize,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
0,
NUM_OPS - 1,
true,
true,
true,
IndexVersion.current()
);

default:
throw new IllegalArgumentException("Unknown mode " + indexMode);
};

long totalSize = 0;
try (snapshot) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING,
IndexSettings.SYNTHETIC_SOURCE_SECOND_DOC_PARSING_PASS_SETTING,
SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING,
IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
20 changes: 19 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -660,6 +661,13 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

public static final Setting<Boolean> INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING = Setting.boolSetting(
"index.recovery.recovery_source.synthetic.enabled",
false,
Property.IndexScope,
Property.Final
);

/**
* Returns <code>true</code> if TSDB encoding is enabled. The default is <code>true</code>
*/
Expand Down Expand Up @@ -832,6 +840,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile boolean syntheticSourceSecondDocParsingPassEnabled;
private final SourceFieldMapper.Mode indexMappingSourceMode;
private final boolean recoverySourceEnabled;
private final boolean recoverySourceSyntheticEnabled;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -993,8 +1002,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
skipIgnoredSourceWrite = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING);
skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING);
syntheticSourceSecondDocParsingPassEnabled = scopedSettings.get(SYNTHETIC_SOURCE_SECOND_DOC_PARSING_PASS_SETTING);
indexMappingSourceMode = scopedSettings.get(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING);
indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings);
recoverySourceSyntheticEnabled = scopedSettings.get(INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
Expand Down Expand Up @@ -1698,6 +1708,14 @@ public boolean isRecoverySourceEnabled() {
return recoverySourceEnabled;
}

/**
* @return Whether recovery source should always be bypassed in favor of using synthetic source.
*/
public boolean isRecoverySourceSyntheticEnabled() {
return version.onOrAfter(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY)
&& recoverySourceSyntheticEnabled;
}

/**
* The bounds for {@code @timestamp} on this index or
* {@code null} if there are no bounds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ private static Version parseUnchecked(String version) {

public static final IndexVersion UPGRADE_TO_LUCENE_10_0_0 = def(9_000_00_0, Version.LUCENE_10_0_0);

public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY = def(9_001_00_0, Version.LUCENE_10_0_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ final class CombinedDocValues {
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;
private final NumericDocValues recoverySourceSize;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
Expand All @@ -34,6 +35,7 @@ final class CombinedDocValues {
);
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
this.recoverySourceSize = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
Expand Down Expand Up @@ -79,4 +81,12 @@ boolean hasRecoverySource(int segmentDocId) throws IOException {
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}

long recoverySourceSize(int segmentDocId) throws IOException {
if (recoverySourceSize == null) {
return -1;
}
assert recoverySourceSize.docID() < segmentDocId;
return recoverySourceSize.advanceExact(segmentDocId) ? recoverySourceSize.longValue() : -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2709,7 +2709,9 @@ private IndexWriterConfig getIndexWriterConfig() {
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
engineConfig.getMapperService().mappingLookup().isSourceSynthetic()
&& engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled() ? null : SourceFieldMapper.RECOVERY_SOURCE_NAME,
SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME,
engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Expand Down Expand Up @@ -3153,16 +3155,32 @@ public Translog.Snapshot newChangesSnapshot(
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher,
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange,
singleConsumer,
accessStats,
config().getIndexSettings().getIndexVersionCreated()
);
final Translog.Snapshot snapshot;
if (engineConfig.getMapperService().mappingLookup().isSourceSynthetic()
&& engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
snapshot = new LuceneSyntheticSourceChangesSnapshot(
engineConfig.getMapperService().mappingLookup(),
searcher,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
LuceneSyntheticSourceChangesSnapshot.DEFAULT_MEMORY_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange,
accessStats,
config().getIndexSettings().getIndexVersionCreated()
);
} else {
snapshot = new LuceneChangesSnapshot(
searcher,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange,
singleConsumer,
accessStats,
config().getIndexSettings().getIndexVersionCreated()
);
}
searcher = null;
return snapshot;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit a96648d

Please sign in to comment.