Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi committed Nov 14, 2024
1 parent 67ca170 commit 6b08b40
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,7 @@ public class LuceneChangesSnapshotBenchmark {
public void setup() throws IOException {
this.path = Files.createTempDirectory("snapshot_changes");
Settings settings = mode.startsWith("logsdb")
? Settings.builder()
.put("index.mode", "logsdb")
.put(IndexSettings.RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true)
.build()
? Settings.builder().put("index.mode", "logsdb").put(IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey(), true).build()
: Settings.EMPTY;
this.mapperService = MapperServiceFactory.create(settings, readMappings(dataset));
IndexWriterConfig config = new IndexWriterConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING,
IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING,
SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING,
IndexSettings.RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING,
IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
54 changes: 49 additions & 5 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -51,6 +52,7 @@
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -653,9 +655,51 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<Boolean> RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING = Setting.boolSetting(
"index.recovery.recovery_source.synthetic.enabled",
public static final Setting<Boolean> RECOVERY_USE_SYNTHETIC_SOURCE_SETTING = Setting.boolSetting(
"index.recovery.use_synthetic_source",
false,
new Setting.Validator<>() {
@Override
public void validate(Boolean value) {}

@Override
public void validate(Boolean enabled, Map<Setting<?>, Object> settings) {
if (enabled == false) {
return;
}
var mode = (SourceFieldMapper.Mode) settings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
if (mode != SourceFieldMapper.Mode.SYNTHETIC) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"The setting [%s] is only permitted when [%s] is set to [%s]. Current mode: [%s].",
RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey(),
INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(),
SourceFieldMapper.Mode.SYNTHETIC.name(),
mode.name()
)
);
}
var version = (IndexVersion) settings.get(SETTING_INDEX_VERSION_CREATED);
if (version.before(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"The setting [%s] is unavailable on this cluster because some nodes are running older "
+ "versions that do not support it. Please upgrade all nodes to the latest version "
+ "and try again.",
RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey()
)
);
}
}

@Override
public Iterator<Setting<?>> settings() {
List<Setting<?>> res = List.of(INDEX_MAPPER_SOURCE_MODE_SETTING, SETTING_INDEX_VERSION_CREATED);
return res.iterator();
}
},
Property.IndexScope,
Property.Final
);
Expand Down Expand Up @@ -992,9 +1036,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
es87TSDBCodecEnabled = scopedSettings.get(TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING);
skipIgnoredSourceWrite = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING);
skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING);
indexMappingSourceMode = scopedSettings.get(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING);
indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings);
recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING);
recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
Expand Down Expand Up @@ -1690,7 +1734,7 @@ public boolean isRecoverySourceEnabled() {
* @return Whether recovery source should always be bypassed in favor of using synthetic source.
*/
public boolean isRecoverySourceSyntheticEnabled() {
return version.onOrAfter(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY) && recoverySourceSyntheticEnabled;
return recoverySourceSyntheticEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2709,8 +2709,10 @@ private IndexWriterConfig getIndexWriterConfig() {
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(
useSyntheticSourceForRecovery() ? null : SourceFieldMapper.RECOVERY_SOURCE_NAME,
SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME,
engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled() ? null : SourceFieldMapper.RECOVERY_SOURCE_NAME,
engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()
? SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME
: SourceFieldMapper.RECOVERY_SOURCE_NAME,
engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Expand Down Expand Up @@ -2754,13 +2756,6 @@ private IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

private boolean useSyntheticSourceForRecovery() {
return engineConfig.getMapperService() != null
&& engineConfig.getMapperService().mappingLookup() != null
&& engineConfig.getMapperService().mappingLookup().isSourceSynthetic()
&& engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled();
}

/** A listener that warms the segments if needed when acquiring a new reader */
static final class RefreshWarmerListener implements BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> {
private final Warmer warmer;
Expand Down Expand Up @@ -3162,7 +3157,7 @@ public Translog.Snapshot newChangesSnapshot(
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
final Translog.Snapshot snapshot;
if (useSyntheticSourceForRecovery()) {
if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
snapshot = new LuceneSyntheticSourceChangesSnapshot(
engineConfig.getMapperService().mappingLookup(),
searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@

/**
* A {@link SearchBasedChangesSnapshot} that utilizes a synthetic field loader to rebuild the recovery source.
* This snapshot is activated when {@link IndexSettings#RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING}
* This snapshot is activated when {@link IndexSettings#RECOVERY_USE_SYNTHETIC_SOURCE_SETTING}
* is enabled on the underlying index.
*
* The {@code maxMemorySizeInBytes} parameter limits the total size of uncompressed _sources
* loaded into memory during batch retrieval.
*/
public class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot {
public static final int DEFAULT_SEARCH_BATCH_SIZE = 1024;
public static final long DEFAULT_MEMORY_SIZE = 4 * 1024 * 1024; // 4MB

private final long maxMemorySizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@

final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
RecoverySourcePruneMergePolicy(
@Nullable String recoverySourceField,
String recoverySourceSizeField,
@Nullable String pruneStoredFieldName,
String pruneNumericDVFieldName,
boolean pruneIdField,
Supplier<Query> retainSourceQuerySupplier,
MergePolicy in
Expand All @@ -53,21 +53,19 @@ final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
CodecReader wrapped = toWrap.wrapForMerge(reader);
return wrapReader(recoverySourceField, recoverySourceSizeField, pruneIdField, wrapped, retainSourceQuerySupplier);
return wrapReader(pruneStoredFieldName, pruneNumericDVFieldName, pruneIdField, wrapped, retainSourceQuerySupplier);
}
});
}

private static CodecReader wrapReader(
String recoverySourceField,
String recoverySourceSizeField,
String pruneStoredFieldName,
String pruneNumericDVFieldName,
boolean pruneIdField,
CodecReader reader,
Supplier<Query> retainSourceQuerySupplier
) throws IOException {
NumericDocValues recoverySource = reader.getNumericDocValues(
recoverySourceField != null ? recoverySourceField : recoverySourceSizeField
);
NumericDocValues recoverySource = reader.getNumericDocValues(pruneNumericDVFieldName);
if (recoverySource == null || recoverySource.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
return reader; // early terminate - nothing to do here since non of the docs has a recovery source anymore.
}
Expand All @@ -83,34 +81,34 @@ private static CodecReader wrapReader(
return reader; // keep all source
}
return new SourcePruningFilterCodecReader(
recoverySourceField,
recoverySourceSizeField,
pruneStoredFieldName,
pruneNumericDVFieldName,
pruneIdField,
reader,
recoverySourceToKeep
);
} else {
return new SourcePruningFilterCodecReader(recoverySourceField, recoverySourceSizeField, pruneIdField, reader, null);
return new SourcePruningFilterCodecReader(pruneStoredFieldName, pruneNumericDVFieldName, pruneIdField, reader, null);
}
}

private static class SourcePruningFilterCodecReader extends FilterCodecReader {
private final BitSet recoverySourceToKeep;
private final String recoverySourceField;
private final String recoverySourceSizeField;
private final String pruneStoredFieldName;
private final String pruneNumericDVFieldName;
private final boolean pruneIdField;

SourcePruningFilterCodecReader(
String recoverySourceField,
String recoverySourceSizeField,
@Nullable String pruneStoredFieldName,
String pruneNumericDVFieldName,
boolean pruneIdField,
CodecReader reader,
BitSet recoverySourceToKeep
) {
super(reader);
this.recoverySourceField = recoverySourceField;
this.pruneStoredFieldName = pruneStoredFieldName;
this.recoverySourceToKeep = recoverySourceToKeep;
this.recoverySourceSizeField = recoverySourceSizeField;
this.pruneNumericDVFieldName = pruneNumericDVFieldName;
this.pruneIdField = pruneIdField;
}

Expand All @@ -121,8 +119,8 @@ public DocValuesProducer getDocValuesReader() {
@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
NumericDocValues numeric = super.getNumeric(field);
if (field.name.equals(recoverySourceField) || field.name.equals(recoverySourceSizeField)) {
assert numeric != null : recoverySourceField + " must have numeric DV but was null";
if (field.name.equals(pruneNumericDVFieldName)) {
assert numeric != null : pruneNumericDVFieldName + " must have numeric DV but was null";
final DocIdSetIterator intersection;
if (recoverySourceToKeep == null) {
// we can't return null here lucenes DocIdMerger expects an instance
Expand Down Expand Up @@ -157,13 +155,14 @@ public boolean advanceExact(int target) {

@Override
public StoredFieldsReader getFieldsReader() {
if (recoverySourceField == null && pruneIdField == false) {
if (pruneStoredFieldName == null && pruneIdField == false) {
// nothing to prune, we can use the original fields reader
return super.getFieldsReader();
}
return new RecoverySourcePruningStoredFieldsReader(
super.getFieldsReader(),
recoverySourceToKeep,
recoverySourceField,
pruneStoredFieldName,
pruneIdField
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ public void preParse(DocumentParserContext context) throws IOException {
if (enableRecoverySource && originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
if (isSynthetic() && context.indexSettings().isRecoverySourceSyntheticEnabled()) {
if (context.indexSettings().isRecoverySourceSyntheticEnabled()) {
assert isSynthetic() : "recovery source should not be disabled on non-synthetic source";
/**
* We use synthetic source for recovery, so we omit the recovery source.
* Instead, we record only the size of the uncompressed source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ protected Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), SourceFieldMapper.Mode.SYNTHETIC.name())
.put(IndexSettings.RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true)
.put(IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testPruneAll() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
RecoverySourcePruneMergePolicy mp = new RecoverySourcePruneMergePolicy(
syntheticRecoverySource ? null : "extra_source",
"extra_source_size",
syntheticRecoverySource ? "extra_source_size" : "extra_source",
pruneIdField,
MatchNoDocsQuery::new,
newLogMergePolicy()
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testPruneSome() throws IOException {
iwc.setMergePolicy(
new RecoverySourcePruneMergePolicy(
syntheticRecoverySource ? null : "extra_source",
"extra_source_size",
syntheticRecoverySource ? "extra_source_size" : "extra_source",
pruneIdField,
() -> new TermQuery(new Term("even", "true")),
iwc.getMergePolicy()
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testPruneNone() throws IOException {
iwc.setMergePolicy(
new RecoverySourcePruneMergePolicy(
syntheticRecoverySource ? null : "extra_source",
"extra_source_size",
syntheticRecoverySource ? "extra_source_size" : "extra_source",
false,
MatchAllDocsQuery::new,
iwc.getMergePolicy()
Expand Down
Loading

0 comments on commit 6b08b40

Please sign in to comment.