diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java index c64cc2996ad58..dc98755432cdc 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java @@ -14,6 +14,9 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; @@ -24,16 +27,20 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.codec.PerFieldMapperCodec; import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.LuceneBatchChangesSnapshot; import org.elasticsearch.index.engine.LuceneChangesSnapshot; +import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot; +import org.elasticsearch.index.engine.SearchBasedChangesSnapshot; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; @@ -65,6 +72,8 @@ import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; +import static org.elasticsearch.index.engine.Engine.ROOT_DOC_FIELD_NAME; + @Fork(value = 1) @Warmup(iterations = 3, time = 3) @Measurement(iterations = 5, time = 3) @@ -72,7 +81,7 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Thread) public class LuceneChangesSnapshotBenchmark { - @Param({ "default@1024", "logsdb@1024", "logsdb-batch@16", "logsdb-batch@64", "logsdb-batch@512", "logsdb-batch@1024" }) + @Param({ "default", "logsdb@1kb", "logsdb@4MB" }) String mode; @Param({ "false", "true" }) @@ -96,7 +105,12 @@ public class LuceneChangesSnapshotBenchmark { @Setup public void setup() throws IOException { this.path = Files.createTempDirectory("snapshot_changes"); - Settings settings = mode.startsWith("logsdb") ? Settings.builder().put("index.mode", "logsdb").build() : Settings.EMPTY; + Settings settings = mode.startsWith("logsdb") + ? Settings.builder() + .put("index.mode", "logsdb") + .put(IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true) + .build() + : Settings.EMPTY; this.mapperService = MapperServiceFactory.create(settings, readMappings(dataset)); IndexWriterConfig config = new IndexWriterConfig(); config.setCodec( @@ -104,6 +118,7 @@ public void setup() throws IOException { ); if (sequential == false) { config.setIndexSort(new Sort(new SortField[] { new SortField("rand", SortField.Type.LONG) })); + config.setParentField(ROOT_DOC_FIELD_NAME); } try (FSDirectory dir = FSDirectory.open(path); IndexWriter writer = new IndexWriter(dir, config);) { try ( @@ -140,6 +155,17 @@ public void setup() throws IOException { DirectoryReader.open(dir), new ShardId(mapperService.getIndexSettings().getIndex(), 0) ); + long sizeInBytes = 0; + for (LeafReaderContext readerContext : reader.leaves()) { + // we go on the segment level here to get accurate numbers + final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader()); + SegmentCommitInfo info = segmentReader.getSegmentInfo(); + try { + sizeInBytes += info.sizeInBytes(); + } catch (IOException e) {} + } + System.out.println("Size: " + ByteSizeValue.ofBytes(sizeInBytes)); + this.searcher = new Engine.Searcher("snapshot", reader, new BM25Similarity(), null, new QueryCachingPolicy() { @Override public void onUse(Query query) {} @@ -167,40 +193,38 @@ public void tearDown() { @Benchmark @OperationsPerInvocation(NUM_OPS) public long recover() throws IOException { - long totalSize = 0; String indexMode = mode.split("@")[0]; - int batchSize = Integer.parseInt(mode.split("@")[1]); - Translog.Snapshot snapshot = switch (indexMode) { - case "default": - case "logsdb": - yield new LuceneChangesSnapshot( + Translog.Snapshot snapshot = switch (mapperService.getIndexSettings().getMode()) { + case LOGSDB: + assert indexMode.equals("logsdb"); + long maxMemorySize = ByteSizeValue.parseBytesSizeValue(mode.split("@")[1], "").getBytes(); + yield new LuceneSyntheticSourceChangesSnapshot( mapperService.mappingLookup(), searcher, - batchSize, + SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, + maxMemorySize, 0, NUM_OPS - 1, true, true, - true, IndexVersion.current() ); - case "logsdb-batch": - yield new LuceneBatchChangesSnapshot( - mapperService.mappingLookup(), + default: + assert indexMode.equals("default"); + yield new LuceneChangesSnapshot( searcher, - batchSize, + SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, 0, NUM_OPS - 1, true, true, + true, IndexVersion.current() ); - - default: - throw new IllegalArgumentException("Unknown mode " + indexMode); }; + long totalSize = 0; try (snapshot) { Translog.Operation op; while ((op = snapshot.next()) != null) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index f5276bbe49b63..2b4f6b64d646d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -189,6 +189,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING, IndexSettings.SYNTHETIC_SOURCE_SECOND_DOC_PARSING_PASS_SETTING, SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING, + IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 25e9c1e3701fb..78b901ceef017 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -51,6 +51,7 @@ import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING; import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING; +import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING; /** * This class encapsulates all index level settings and handles settings updates. @@ -660,6 +661,13 @@ public Iterator> settings() { Property.Dynamic ); + public static final Setting INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING = Setting.boolSetting( + "index.recovery.recovery_source.synthetic.enabled", + false, + Property.IndexScope, + Property.Final + ); + /** * Returns true if TSDB encoding is enabled. The default is true */ @@ -832,6 +840,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile boolean syntheticSourceSecondDocParsingPassEnabled; private final SourceFieldMapper.Mode indexMappingSourceMode; private final boolean recoverySourceEnabled; + private final boolean recoverySourceSyntheticEnabled; /** * The maximum number of refresh listeners allows on this shard. @@ -993,8 +1002,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti skipIgnoredSourceWrite = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING); skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING); syntheticSourceSecondDocParsingPassEnabled = scopedSettings.get(SYNTHETIC_SOURCE_SECOND_DOC_PARSING_PASS_SETTING); - indexMappingSourceMode = scopedSettings.get(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING); + indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING); recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings); + recoverySourceSyntheticEnabled = scopedSettings.get(INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING); scopedSettings.addSettingsUpdateConsumer( MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, @@ -1698,6 +1708,14 @@ public boolean isRecoverySourceEnabled() { return recoverySourceEnabled; } + /** + * @return Whether recovery source should always be bypassed in favor of using synthetic source. + */ + public boolean isRecoverySourceSyntheticEnabled() { + return version.onOrAfter(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY) + && recoverySourceSyntheticEnabled; + } + /** * The bounds for {@code @timestamp} on this index or * {@code null} if there are no bounds. diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java index efb1facc79b3a..9a31eab19b3ec 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java +++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java @@ -131,6 +131,8 @@ private static Version parseUnchecked(String version) { public static final IndexVersion UPGRADE_TO_LUCENE_10_0_0 = def(9_000_00_0, Version.LUCENE_10_0_0); + public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY = def(9_001_00_0, Version.LUCENE_10_0_0); + /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java index 48fc76063f815..190a1ed8b457a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -24,6 +24,7 @@ final class CombinedDocValues { private final NumericDocValues primaryTermDV; private final NumericDocValues tombstoneDV; private final NumericDocValues recoverySource; + private final NumericDocValues recoverySourceSize; CombinedDocValues(LeafReader leafReader) throws IOException { this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); @@ -34,6 +35,7 @@ final class CombinedDocValues { ); this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); + this.recoverySourceSize = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME); } long docVersion(int segmentDocId) throws IOException { @@ -79,4 +81,12 @@ boolean hasRecoverySource(int segmentDocId) throws IOException { assert recoverySource.docID() < segmentDocId; return recoverySource.advanceExact(segmentDocId); } + + long recoverySourceSize(int segmentDocId) throws IOException { + if (recoverySourceSize == null) { + return -1; + } + assert recoverySourceSize.docID() < segmentDocId; + return recoverySourceSize.advanceExact(segmentDocId) ? recoverySourceSize.longValue() : -1; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8d43252d178ee..1b1716d5285f4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2709,7 +2709,9 @@ private IndexWriterConfig getIndexWriterConfig() { // always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes. iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); mergePolicy = new RecoverySourcePruneMergePolicy( - SourceFieldMapper.RECOVERY_SOURCE_NAME, + engineConfig.getMapperService().mappingLookup().isSourceSynthetic() + && engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled() ? null : SourceFieldMapper.RECOVERY_SOURCE_NAME, + SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME, engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES, softDeletesPolicy::getRetentionQuery, new SoftDeletesRetentionMergePolicy( @@ -3153,16 +3155,32 @@ public Translog.Snapshot newChangesSnapshot( refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { - LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( - searcher, - LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, - fromSeqNo, - toSeqNo, - requiredFullRange, - singleConsumer, - accessStats, - config().getIndexSettings().getIndexVersionCreated() - ); + final Translog.Snapshot snapshot; + if (engineConfig.getMapperService().mappingLookup().isSourceSynthetic() + && engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) { + snapshot = new LuceneSyntheticSourceChangesSnapshot( + engineConfig.getMapperService().mappingLookup(), + searcher, + SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, + LuceneSyntheticSourceChangesSnapshot.DEFAULT_MEMORY_SIZE, + fromSeqNo, + toSeqNo, + requiredFullRange, + accessStats, + config().getIndexSettings().getIndexVersionCreated() + ); + } else { + snapshot = new LuceneChangesSnapshot( + searcher, + SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, + fromSeqNo, + toSeqNo, + requiredFullRange, + singleConsumer, + accessStats, + config().getIndexSettings().getIndexVersionCreated() + ); + } searcher = null; return snapshot; } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java deleted file mode 100644 index fad0ea3dcb9a1..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.FieldDoc; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldCollectorManager; -import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.search.Queries; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; -import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; -import org.elasticsearch.index.mapper.MappingLookup; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldMetrics; -import org.elasticsearch.index.mapper.SourceLoader; -import org.elasticsearch.index.translog.Translog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.index.engine.LuceneChangesSnapshot.getLeafDocIDs; -import static org.elasticsearch.index.engine.LuceneChangesSnapshot.loadFromSourceLoader; -import static org.elasticsearch.index.engine.LuceneChangesSnapshot.loadFromStoredFields; - -/** - * A {@link Translog.Snapshot} implementation that retrieves changes from a Lucene index in batches. - * All operations within a batch are held in memory, with the batch size configurable to control - * memory usage. - */ -public final class LuceneBatchChangesSnapshot implements Translog.Snapshot { - private final int batchSize; - private final long fromSeqNo, toSeqNo; - private long lastSeenSeqNo; - private int skippedOperations; - private final boolean requiredFullRange; - - private final IndexSearcher indexSearcher; - private int docIndex = 0; - private final boolean accessStats; - private final int totalHits; - private ScoreDoc[] scoreDocs; - private final Translog.Operation[] ops; - private final Closeable onClose; - - private final IndexVersion indexVersionCreated; - - private final StoredFieldLoader storedFieldLoader; - private final SourceLoader sourceLoader; - - /** - * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. - * - * @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully - * @param batchSize the number of documents to load per batch - * @param fromSeqNo the min requesting seq# - inclusive - * @param toSeqNo the maximum requesting seq# - inclusive - * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo - * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} - * @param indexVersionCreated the version on which this index was created - */ - public LuceneBatchChangesSnapshot( - MappingLookup mappingLookup, - Engine.Searcher engineSearcher, - int batchSize, - long fromSeqNo, - long toSeqNo, - boolean requiredFullRange, - boolean accessStats, - IndexVersion indexVersionCreated - ) throws IOException { - if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { - throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); - } - if (batchSize <= 0) { - throw new IllegalArgumentException("batch_size must be positive [" + batchSize + "]"); - } - final AtomicBoolean closed = new AtomicBoolean(); - this.onClose = () -> { - if (closed.compareAndSet(false, true)) { - IOUtils.close(engineSearcher); - } - }; - final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); - this.batchSize = requestingSize < batchSize ? Math.toIntExact(requestingSize) : batchSize; - this.fromSeqNo = fromSeqNo; - this.toSeqNo = toSeqNo; - this.lastSeenSeqNo = fromSeqNo - 1; - this.requiredFullRange = requiredFullRange; - this.indexSearcher = newIndexSearcher(engineSearcher); - this.indexSearcher.setQueryCache(null); - this.accessStats = accessStats; - this.ops = new Translog.Operation[this.batchSize]; - this.indexVersionCreated = indexVersionCreated; - final TopDocs topDocs = searchOperations(null, accessStats); - this.totalHits = Math.toIntExact(topDocs.totalHits.value); - this.scoreDocs = topDocs.scoreDocs; - if (mappingLookup != null) { - this.sourceLoader = mappingLookup.newSourceLoader(SourceFieldMetrics.NOOP); - Set storedFields = sourceLoader.requiredStoredFields(); - this.storedFieldLoader = StoredFieldLoader.create(mappingLookup.isSourceSynthetic(), storedFields); - } else { - this.storedFieldLoader = StoredFieldLoader.create(true, Set.of()); - this.sourceLoader = null; - } - fillOps(scoreDocs, ops); - } - - @Override - public void close() throws IOException { - onClose.close(); - } - - @Override - public int totalOperations() { - if (accessStats == false) { - throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false"); - } - return totalHits; - } - - @Override - public int skippedOperations() { - return skippedOperations; - } - - @Override - public Translog.Operation next() throws IOException { - Translog.Operation op = null; - for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { - op = ops[idx]; - if (op != null) { - // Only pick the first seen seq# - if (op.seqNo() == lastSeenSeqNo) { - skippedOperations++; - continue; - } - break; - } - } - if (requiredFullRange) { - rangeCheck(op); - } - if (op != null) { - assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() - : "Unexpected operation; " - + "last_seen_seqno [" - + lastSeenSeqNo - + "], from_seqno [" - + fromSeqNo - + "], to_seqno [" - + toSeqNo - + "], op [" - + op - + "]"; - lastSeenSeqNo = op.seqNo(); - } - return op; - } - - private void rangeCheck(Translog.Operation op) { - if (op == null) { - if (lastSeenSeqNo < toSeqNo) { - throw new MissingHistoryOperationsException( - "Not all operations between from_seqno [" - + fromSeqNo - + "] " - + "and to_seqno [" - + toSeqNo - + "] found; prematurely terminated last_seen_seqno [" - + lastSeenSeqNo - + "]" - ); - } - } else { - final long expectedSeqNo = lastSeenSeqNo + 1; - if (op.seqNo() != expectedSeqNo) { - throw new MissingHistoryOperationsException( - "Not all operations between from_seqno [" - + fromSeqNo - + "] " - + "and to_seqno [" - + toSeqNo - + "] found; expected seqno [" - + expectedSeqNo - + "]; found [" - + op - + "]" - ); - } - } - } - - private int nextDocIndex() throws IOException { - // we have processed all docs in the current search - fetch the next batch - if (docIndex == scoreDocs.length && docIndex > 0) { - final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; - scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs; - fillOps(scoreDocs, ops); - docIndex = 0; - } - if (docIndex < scoreDocs.length) { - int idx = docIndex; - docIndex++; - return idx; - } - return -1; - } - - private void fillOps(ScoreDoc[] scoreDocs, Translog.Operation[] ops) throws IOException { - for (int i = 0; i < scoreDocs.length; i++) { - scoreDocs[i].shardIndex = i; - } - // for better loading performance we sort the array by docID and - // then visit all leaves in order. - ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); - int docBase = -1; - int maxDoc = 0; - List leaves = indexSearcher.getIndexReader().leaves(); - int readerIndex = 0; - LeafReaderContext leafReader = null; - CombinedDocValues combinedDocValues = null; - LeafStoredFieldLoader leafFields = null; - SourceLoader.Leaf leafSource = null; - for (int i = 0; i < scoreDocs.length; i++) { - var scoreDoc = scoreDocs[i]; - if (scoreDoc.doc >= docBase + maxDoc) { - do { - leafReader = leaves.get(readerIndex++); - docBase = leafReader.docBase; - maxDoc = leafReader.reader().maxDoc(); - } while (scoreDoc.doc >= docBase + maxDoc); - - combinedDocValues = new CombinedDocValues(leafReader.reader()); - var docIds = getLeafDocIDs(scoreDocs, i, docBase, maxDoc); - leafFields = storedFieldLoader.getLoader(leafReader, docIds); - leafSource = sourceLoader != null ? sourceLoader.leaf(leafReader.reader(), docIds) : null; - } - final int segmentDocID = scoreDoc.doc - docBase; - final int index = scoreDoc.shardIndex; - - final long primaryTerm = combinedDocValues.docPrimaryTerm(segmentDocID); - assert primaryTerm > 0 : "nested child document must be excluded"; - final long seqNo = combinedDocValues.docSeqNo(segmentDocID); - final long version = combinedDocValues.docVersion(segmentDocID); - final boolean hasRecoverySource = combinedDocValues.hasRecoverySource(segmentDocID); - final var doc = (hasRecoverySource || leafSource == null) - ? loadFromStoredFields(segmentDocID, leafFields.reader(), hasRecoverySource) - : loadFromSourceLoader(segmentDocID, leafFields, leafSource); - - final Translog.Operation op; - final boolean isTombstone = combinedDocValues.isTombstone(segmentDocID); - if (isTombstone && doc.id() == null) { - op = new Translog.NoOp(seqNo, primaryTerm, doc.source().utf8ToString()); - assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; - assert assertDocSoftDeleted(leafReader.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; - } else { - if (isTombstone) { - op = new Translog.Delete(doc.id(), seqNo, primaryTerm, version); - assert assertDocSoftDeleted(leafReader.reader(), segmentDocID) - : "Delete op but soft_deletes field is not set [" + op + "]"; - } else { - if (doc.source() == null) { - // TODO: Callers should ask for the range that source should be retained. Thus we should always - // check for the existence source once we make peer-recovery to send ops after the local checkpoint. - if (requiredFullRange) { - throw new MissingHistoryOperationsException( - "source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo - ); - } else { - skippedOperations++; - ops[index] = null; - continue; - } - } - // TODO: pass the latest timestamp from engine. - final long autoGeneratedIdTimestamp = -1; - op = new Translog.Index(doc.id(), seqNo, primaryTerm, version, doc.source(), doc.routing(), autoGeneratedIdTimestamp); - } - } - ops[index] = op; - } - ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); - } - - private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { - return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); - } - - private static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) { - return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) - .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents - .build(); - } - - static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) - throws IOException { - if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { - throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); - } - return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)); - } - - private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException { - final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated); - assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only"; - final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG); - TopFieldCollectorManager topFieldCollectorManager = new TopFieldCollectorManager( - new Sort(sortBySeqNo), - batchSize, - after, - accurateTotalHits ? Integer.MAX_VALUE : 0 - ); - return indexSearcher.search(rangeQuery, topFieldCollectorManager); - } - - private static boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { - final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); - if (ndv == null || ndv.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETES_FIELD + "] is not found"); - } - return ndv.longValue() == 1; - } -} diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index e44b344d3b283..93a372886b8b3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -10,61 +10,32 @@ package org.elasticsearch.index.engine; import org.apache.lucene.codecs.StoredFieldsReader; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.FieldDoc; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldCollectorManager; import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader; -import org.elasticsearch.common.lucene.search.Queries; -import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.Transports; -import java.io.Closeable; import java.io.IOException; import java.util.Comparator; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link Translog.Snapshot} from changes in a Lucene index */ -final class LuceneChangesSnapshot implements Translog.Snapshot { - static final int DEFAULT_BATCH_SIZE = 1024; - - private final int searchBatchSize; - private final long fromSeqNo, toSeqNo; +public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { private long lastSeenSeqNo; private int skippedOperations; - private final boolean requiredFullRange; private final boolean singleConsumer; - private final IndexSearcher indexSearcher; private int docIndex = 0; - private final boolean accessStats; - private final int totalHits; - private ScoreDoc[] scoreDocs; + private int maxDocIndex; private final ParallelArray parallelArray; - private final Closeable onClose; - - private final IndexVersion indexVersionCreated; private int storedFieldsReaderOrd = -1; private StoredFieldsReader storedFieldsReader = null; @@ -83,7 +54,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} * @param indexVersionCreated the version on which this index was created */ - LuceneChangesSnapshot( + public LuceneChangesSnapshot( Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, @@ -93,50 +64,26 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { boolean accessStats, IndexVersion indexVersionCreated ) throws IOException { - if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { - throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); - } - if (searchBatchSize <= 0) { - throw new IllegalArgumentException("Search_batch_size must be positive [" + searchBatchSize + "]"); - } - final AtomicBoolean closed = new AtomicBoolean(); - this.onClose = () -> { - if (closed.compareAndSet(false, true)) { - IOUtils.close(engineSearcher); - } - }; - final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); + super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated); this.creationThread = Thread.currentThread(); - this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize; - this.fromSeqNo = fromSeqNo; - this.toSeqNo = toSeqNo; - this.lastSeenSeqNo = fromSeqNo - 1; - this.requiredFullRange = requiredFullRange; this.singleConsumer = singleConsumer; - this.indexSearcher = newIndexSearcher(engineSearcher); - this.indexSearcher.setQueryCache(null); - this.accessStats = accessStats; this.parallelArray = new ParallelArray(this.searchBatchSize); - this.indexVersionCreated = indexVersionCreated; - final TopDocs topDocs = searchOperations(null, accessStats); - this.totalHits = Math.toIntExact(topDocs.totalHits.value()); - this.scoreDocs = topDocs.scoreDocs; - fillParallelArray(scoreDocs, parallelArray); + this.lastSeenSeqNo = fromSeqNo - 1; + final TopDocs topDocs = nextTopDocs(); + this.maxDocIndex = topDocs.scoreDocs.length; + fillParallelArray(topDocs.scoreDocs, parallelArray); } @Override public void close() throws IOException { assert assertAccessingThread(); - onClose.close(); + super.close(); } @Override public int totalOperations() { assert assertAccessingThread(); - if (accessStats == false) { - throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false"); - } - return totalHits; + return super.totalOperations(); } @Override @@ -146,7 +93,7 @@ public int skippedOperations() { } @Override - public Translog.Operation next() throws IOException { + public Translog.Operation nextOperation() throws IOException { assert assertAccessingThread(); Translog.Operation op = null; for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { @@ -155,12 +102,6 @@ public Translog.Operation next() throws IOException { break; } } - if (requiredFullRange) { - rangeCheck(op); - } - if (op != null) { - lastSeenSeqNo = op.seqNo(); - } return op; } @@ -171,48 +112,15 @@ private boolean assertAccessingThread() { return true; } - private void rangeCheck(Translog.Operation op) { - if (op == null) { - if (lastSeenSeqNo < toSeqNo) { - throw new MissingHistoryOperationsException( - "Not all operations between from_seqno [" - + fromSeqNo - + "] " - + "and to_seqno [" - + toSeqNo - + "] found; prematurely terminated last_seen_seqno [" - + lastSeenSeqNo - + "]" - ); - } - } else { - final long expectedSeqNo = lastSeenSeqNo + 1; - if (op.seqNo() != expectedSeqNo) { - throw new MissingHistoryOperationsException( - "Not all operations between from_seqno [" - + fromSeqNo - + "] " - + "and to_seqno [" - + toSeqNo - + "] found; expected seqno [" - + expectedSeqNo - + "]; found [" - + op - + "]" - ); - } - } - } - private int nextDocIndex() throws IOException { // we have processed all docs in the current search - fetch the next batch - if (docIndex == scoreDocs.length && docIndex > 0) { - final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; - scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs; + if (docIndex == maxDocIndex && docIndex > 0) { + var scoreDocs = nextTopDocs().scoreDocs; fillParallelArray(scoreDocs, parallelArray); docIndex = 0; + maxDocIndex = scoreDocs.length; } - if (docIndex < scoreDocs.length) { + if (docIndex < maxDocIndex) { int idx = docIndex; docIndex++; return idx; @@ -237,14 +145,13 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray } int docBase = -1; int maxDoc = 0; - List leaves = indexSearcher.getIndexReader().leaves(); int readerIndex = 0; CombinedDocValues combinedDocValues = null; LeafReaderContext leaf = null; for (ScoreDoc scoreDoc : scoreDocs) { if (scoreDoc.doc >= docBase + maxDoc) { do { - leaf = leaves.get(readerIndex++); + leaf = leaves().get(readerIndex++); docBase = leaf.docBase; maxDoc = leaf.reader().maxDoc(); } while (scoreDoc.doc >= docBase + maxDoc); @@ -253,6 +160,7 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray final int segmentDocID = scoreDoc.doc - docBase; final int index = scoreDoc.shardIndex; parallelArray.leafReaderContexts[index] = leaf; + parallelArray.docID[index] = scoreDoc.doc; parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID); parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID); parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID); @@ -275,16 +183,6 @@ private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) { return true; } - private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { - return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); - } - - private static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) { - return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) - .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents - .build(); - } - static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) throws IOException { if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { @@ -293,23 +191,9 @@ static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)); } - private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException { - final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated); - assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only"; - final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG); - TopFieldCollectorManager topFieldCollectorManager = new TopFieldCollectorManager( - new Sort(sortBySeqNo), - searchBatchSize, - after, - accurateTotalHits ? Integer.MAX_VALUE : 0, - false - ); - return indexSearcher.search(rangeQuery, topFieldCollectorManager); - } - private Translog.Operation readDocAsOp(int docIndex) throws IOException { final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; - final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; + final int segmentDocID = parallelArray.docID[docIndex] - leaf.docBase; final long primaryTerm = parallelArray.primaryTerm[docIndex]; assert primaryTerm > 0 : "nested child document must be excluded"; final long seqNo = parallelArray.seqNo[docIndex]; @@ -385,19 +269,13 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { + "], op [" + op + "]"; + lastSeenSeqNo = op.seqNo(); return op; } - private static boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { - final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); - if (ndv == null || ndv.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETES_FIELD + "] is not found"); - } - return ndv.longValue() == 1; - } - private static final class ParallelArray { final LeafReaderContext[] leafReaderContexts; + final int[] docID; final long[] version; final long[] seqNo; final long[] primaryTerm; @@ -406,6 +284,7 @@ private static final class ParallelArray { boolean useSequentialStoredFieldsReader = false; ParallelArray(int size) { + docID = new int[size]; version = new long[size]; seqNo = new long[size]; primaryTerm = new long[size]; diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java new file mode 100644 index 0000000000000..6ad4d8df7f43c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java @@ -0,0 +1,249 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.util.ArrayUtil; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.SourceFieldMetrics; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * A {@link SearchBasedChangesSnapshot} that utilizes a synthetic field loader to rebuild the recovery source. + * This snapshot is activated when {@link IndexSettings#INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING} + * is enabled on the underlying index. + * + * The {@code maxMemorySizeInBytes} parameter limits the total size of uncompressed _sources + * loaded into memory during batch retrieval. + */ +public class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot { + public static final int DEFAULT_SEARCH_BATCH_SIZE = 1024; + public static final long DEFAULT_MEMORY_SIZE = 4 * 1024 * 1024; // 4MB + + private final long maxMemorySizeInBytes; + private final StoredFieldLoader storedFieldLoader; + private final SourceLoader sourceLoader; + + private int skippedOperations; + private long lastSeenSeqNo; + + private record SearchRecord(FieldDoc doc, boolean isTombstone, long seqNo, long primaryTerm, long version, long size) { + int index() { + return doc.shardIndex; + } + + int docID() { + return doc.doc; + } + + boolean hasRecoverySourceSize() { + return size != -1; + } + } + + private final Deque pendingDocs = new LinkedList<>(); + private final Deque operationQueue = new LinkedList<>(); + + public LuceneSyntheticSourceChangesSnapshot( + MappingLookup mappingLookup, + Engine.Searcher engineSearcher, + int searchBatchSize, + long maxMemorySizeInBytes, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated); + assert mappingLookup.isSourceSynthetic(); + this.maxMemorySizeInBytes = maxMemorySizeInBytes; + this.sourceLoader = mappingLookup.newSourceLoader(SourceFieldMetrics.NOOP); + Set storedFields = sourceLoader.requiredStoredFields(); + this.storedFieldLoader = StoredFieldLoader.create(mappingLookup.isSourceSynthetic() == false, storedFields); + this.lastSeenSeqNo = fromSeqNo - 1; + } + + @Override + public int skippedOperations() { + return skippedOperations; + } + + @Override + public Translog.Operation nextOperation() throws IOException { + while (true) { + if (operationQueue.isEmpty()) { + loadNextBatch(); + } + if (operationQueue.isEmpty()) { + return null; + } + var op = operationQueue.pollFirst(); + if (op.seqNo() == lastSeenSeqNo) { + skippedOperations++; + continue; + } + lastSeenSeqNo = op.seqNo(); + return op; + } + } + + private void loadNextBatch() throws IOException { + List documentsToLoad = new ArrayList<>(); + long accumulatedSize = 0; + boolean reachedMemoryLimit = false; + while (reachedMemoryLimit == false) { + if (pendingDocs.isEmpty()) { + ScoreDoc[] topDocs = nextTopDocs().scoreDocs; + if (topDocs.length == 0) { + break; + } + pendingDocs.addAll(Arrays.asList(transformScoreDocsToRecords(topDocs))); + } + SearchRecord document = pendingDocs.pollFirst(); + document.doc().shardIndex = documentsToLoad.size(); + documentsToLoad.add(document); + accumulatedSize += document.size(); + if (accumulatedSize > maxMemorySizeInBytes) { + reachedMemoryLimit = true; + } + } + + for (var op : loadDocuments(documentsToLoad)) { + if (op == null) { + skippedOperations++; + continue; + } + operationQueue.add(op); + } + } + + private SearchRecord[] transformScoreDocsToRecords(ScoreDoc[] scoreDocs) throws IOException { + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(doc -> doc.doc)); + SearchRecord[] documentRecords = new SearchRecord[scoreDocs.length]; + CombinedDocValues combinedDocValues = null; + int docBase = -1; + int maxDoc = 0; + int readerIndex = 0; + LeafReaderContext leafReaderContext; + + for (int i = 0; i < scoreDocs.length; i++) { + ScoreDoc scoreDoc = scoreDocs[i]; + if (scoreDoc.doc >= docBase + maxDoc) { + do { + leafReaderContext = leaves().get(readerIndex++); + docBase = leafReaderContext.docBase; + maxDoc = leafReaderContext.reader().maxDoc(); + } while (scoreDoc.doc >= docBase + maxDoc); + combinedDocValues = new CombinedDocValues(leafReaderContext.reader()); + } + int segmentDocID = scoreDoc.doc - docBase; + int index = scoreDoc.shardIndex; + var primaryTerm = combinedDocValues.docPrimaryTerm(segmentDocID); + assert primaryTerm > 0 : "nested child document must be excluded"; + documentRecords[index] = new SearchRecord( + (FieldDoc) scoreDoc, + combinedDocValues.isTombstone(segmentDocID), + combinedDocValues.docSeqNo(segmentDocID), + primaryTerm, + combinedDocValues.docVersion(segmentDocID), + combinedDocValues.recoverySourceSize(segmentDocID) + ); + } + return documentRecords; + } + + private Translog.Operation[] loadDocuments(List documentRecords) throws IOException { + documentRecords.sort(Comparator.comparingInt(doc -> doc.docID())); + Translog.Operation[] operations = new Translog.Operation[documentRecords.size()]; + + int docBase = -1; + int maxDoc = 0; + int readerIndex = 0; + LeafReaderContext leafReaderContext = null; + LeafStoredFieldLoader leafFieldLoader = null; + SourceLoader.Leaf leafSourceLoader = null; + for (int i = 0; i < documentRecords.size(); i++) { + SearchRecord docRecord = documentRecords.get(i); + if (docRecord.docID() >= docBase + maxDoc) { + do { + leafReaderContext = leaves().get(readerIndex++); + docBase = leafReaderContext.docBase; + maxDoc = leafReaderContext.reader().maxDoc(); + } while (docRecord.docID() >= docBase + maxDoc); + + leafFieldLoader = storedFieldLoader.getLoader(leafReaderContext, null); + leafSourceLoader = sourceLoader.leaf(leafReaderContext.reader(), null); + } + int segmentDocID = docRecord.docID() - docBase; + leafFieldLoader.advanceTo(segmentDocID); + operations[docRecord.index()] = createOperation(docRecord, leafFieldLoader, leafSourceLoader, segmentDocID, leafReaderContext); + } + return operations; + } + + private Translog.Operation createOperation( + SearchRecord docRecord, + LeafStoredFieldLoader fieldLoader, + SourceLoader.Leaf sourceLoader, + int segmentDocID, + LeafReaderContext context + ) throws IOException { + if (docRecord.isTombstone() && fieldLoader.id() == null) { + assert docRecord.version() == 1L : "Noop tombstone should have version 1L; actual version [" + docRecord.version() + "]"; + assert assertDocSoftDeleted(context.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + docRecord + "]"; + return new Translog.NoOp(docRecord.seqNo(), docRecord.primaryTerm(), "null"); + } else if (docRecord.isTombstone()) { + assert assertDocSoftDeleted(context.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + docRecord + "]"; + return new Translog.Delete(fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version()); + } else { + if (docRecord.hasRecoverySourceSize() == false) { + // TODO: Callers should ask for the range that source should be retained. Thus we should always + // check for the existence source once we make peer-recovery to send ops after the local checkpoint. + if (requiredFullRange) { + throw new MissingHistoryOperationsException( + "source not found for seqno=" + docRecord.seqNo() + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo + ); + } else { + skippedOperations++; + return null; + } + } + BytesReference source = sourceLoader.source(fieldLoader, segmentDocID).internalSourceRef(); + return new Translog.Index( + fieldLoader.id(), + docRecord.seqNo(), + docRecord.primaryTerm(), + docRecord.version(), + source, + fieldLoader.routing(), + -1 // autogenerated timestamp + ); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java index 3e99818d1827b..dd1cea7516f03 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicy.java @@ -33,17 +33,18 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSetIterator; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.search.internal.FilterStoredFieldVisitor; import java.io.IOException; import java.util.Arrays; -import java.util.Objects; import java.util.function.Supplier; final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy { RecoverySourcePruneMergePolicy( - String recoverySourceField, + @Nullable String recoverySourceField, + String recoverySourceSizeField, boolean pruneIdField, Supplier retainSourceQuerySupplier, MergePolicy in @@ -52,18 +53,21 @@ final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy { @Override public CodecReader wrapForMerge(CodecReader reader) throws IOException { CodecReader wrapped = toWrap.wrapForMerge(reader); - return wrapReader(recoverySourceField, pruneIdField, wrapped, retainSourceQuerySupplier); + return wrapReader(recoverySourceField, recoverySourceSizeField, pruneIdField, wrapped, retainSourceQuerySupplier); } }); } private static CodecReader wrapReader( String recoverySourceField, + String recoverySourceSizeField, boolean pruneIdField, CodecReader reader, Supplier retainSourceQuerySupplier ) throws IOException { - NumericDocValues recoverySource = reader.getNumericDocValues(recoverySourceField); + NumericDocValues recoverySource = reader.getNumericDocValues( + recoverySourceField != null ? recoverySourceField : recoverySourceSizeField + ); if (recoverySource == null || recoverySource.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) { return reader; // early terminate - nothing to do here since non of the docs has a recovery source anymore. } @@ -78,21 +82,35 @@ private static CodecReader wrapReader( if (recoverySourceToKeep.cardinality() == reader.maxDoc()) { return reader; // keep all source } - return new SourcePruningFilterCodecReader(recoverySourceField, pruneIdField, reader, recoverySourceToKeep); + return new SourcePruningFilterCodecReader( + recoverySourceField, + recoverySourceSizeField, + pruneIdField, + reader, + recoverySourceToKeep + ); } else { - return new SourcePruningFilterCodecReader(recoverySourceField, pruneIdField, reader, null); + return new SourcePruningFilterCodecReader(recoverySourceField, recoverySourceSizeField, pruneIdField, reader, null); } } private static class SourcePruningFilterCodecReader extends FilterCodecReader { private final BitSet recoverySourceToKeep; private final String recoverySourceField; + private final String recoverySourceSizeField; private final boolean pruneIdField; - SourcePruningFilterCodecReader(String recoverySourceField, boolean pruneIdField, CodecReader reader, BitSet recoverySourceToKeep) { + SourcePruningFilterCodecReader( + String recoverySourceField, + String recoverySourceSizeField, + boolean pruneIdField, + CodecReader reader, + BitSet recoverySourceToKeep + ) { super(reader); this.recoverySourceField = recoverySourceField; this.recoverySourceToKeep = recoverySourceToKeep; + this.recoverySourceSizeField = recoverySourceSizeField; this.pruneIdField = pruneIdField; } @@ -103,7 +121,7 @@ public DocValuesProducer getDocValuesReader() { @Override public NumericDocValues getNumeric(FieldInfo field) throws IOException { NumericDocValues numeric = super.getNumeric(field); - if (recoverySourceField.equals(field.name)) { + if (field.name.equals(recoverySourceField) || field.name.equals(recoverySourceSizeField)) { assert numeric != null : recoverySourceField + " must have numeric DV but was null"; final DocIdSetIterator intersection; if (recoverySourceToKeep == null) { @@ -139,6 +157,9 @@ public boolean advanceExact(int target) { @Override public StoredFieldsReader getFieldsReader() { + if (recoverySourceField == null && pruneIdField == false) { + return super.getFieldsReader(); + } return new RecoverySourcePruningStoredFieldsReader( super.getFieldsReader(), recoverySourceToKeep, @@ -241,12 +262,13 @@ private static class RecoverySourcePruningStoredFieldsReader extends FilterStore RecoverySourcePruningStoredFieldsReader( StoredFieldsReader in, BitSet recoverySourceToKeep, - String recoverySourceField, + @Nullable String recoverySourceField, boolean pruneIdField ) { super(in); + assert recoverySourceField != null || pruneIdField : "nothing to prune"; this.recoverySourceToKeep = recoverySourceToKeep; - this.recoverySourceField = Objects.requireNonNull(recoverySourceField); + this.recoverySourceField = recoverySourceField; this.pruneIdField = pruneIdField; } @@ -258,7 +280,7 @@ public void document(int docID, StoredFieldVisitor visitor) throws IOException { super.document(docID, new FilterStoredFieldVisitor(visitor) { @Override public Status needsField(FieldInfo fieldInfo) throws IOException { - if (recoverySourceField.equals(fieldInfo.name)) { + if (fieldInfo.name.equals(recoverySourceField)) { return Status.NO; } if (pruneIdField && IdFieldMapper.NAME.equals(fieldInfo.name)) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java new file mode 100644 index 0000000000000..4209654d2e79b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java @@ -0,0 +1,233 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollectorManager; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.translog.Translog; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Abstract class that provides a snapshot mechanism to retrieve operations from a live Lucene index + * within a specified range of sequence numbers. Subclasses are expected to define the + * method to fetch the next batch of operations. + */ +public abstract class SearchBasedChangesSnapshot implements Translog.Snapshot, Closeable { + public static final int DEFAULT_BATCH_SIZE = 1024; + + private final IndexVersion indexVersionCreated; + private final IndexSearcher indexSearcher; + private final Closeable onClose; + + protected final long fromSeqNo, toSeqNo; + protected final boolean requiredFullRange; + protected final int searchBatchSize; + + private final boolean accessStats; + private final int totalHits; + private FieldDoc afterDoc; + private long lastSeenSeqNo; + + /** + * Constructs a new snapshot for fetching changes within a sequence number range. + * + * @param engineSearcher Engine searcher instance. + * @param searchBatchSize Number of documents to retrieve per batch. + * @param fromSeqNo Starting sequence number. + * @param toSeqNo Ending sequence number. + * @param requiredFullRange Whether the full range is required. + * @param accessStats If true, enable access statistics for counting total operations. + * @param indexVersionCreated Version of the index when it was created. + */ + protected SearchBasedChangesSnapshot( + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + + if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + if (searchBatchSize <= 0) { + throw new IllegalArgumentException("Search_batch_size must be positive [" + searchBatchSize + "]"); + } + + this.indexVersionCreated = indexVersionCreated; + this.fromSeqNo = fromSeqNo; + this.toSeqNo = toSeqNo; + this.lastSeenSeqNo = fromSeqNo - 1; + this.requiredFullRange = requiredFullRange; + this.indexSearcher = newIndexSearcher(engineSearcher); + this.indexSearcher.setQueryCache(null); + + final AtomicBoolean closed = new AtomicBoolean(); + this.onClose = () -> { + if (closed.compareAndSet(false, true)) { + IOUtils.close(engineSearcher); + } + }; + + long requestingSize = (toSeqNo - fromSeqNo == Long.MAX_VALUE) ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); + this.searchBatchSize = (int) Math.min(requestingSize, searchBatchSize); + + this.accessStats = accessStats; + this.totalHits = accessStats ? indexSearcher.count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)) : -1; + } + + /** + * Abstract method for retrieving the next operation. Should be implemented by subclasses. + * + * @return The next Translog.Operation in the snapshot. + * @throws IOException If an I/O error occurs. + */ + public abstract Translog.Operation nextOperation() throws IOException; + + /** + * Returns the list of index leaf reader contexts. + * + * @return List of LeafReaderContext. + */ + public List leaves() { + return indexSearcher.getIndexReader().leaves(); + } + + @Override + public int totalOperations() { + if (accessStats == false) { + throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false"); + } + return totalHits; + } + + @Override + public final Translog.Operation next() throws IOException { + Translog.Operation op = nextOperation(); + if (requiredFullRange) { + verifyRange(op); + } + if (op != null) { + assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() + : "Unexpected operation; last_seen_seqno [" + + lastSeenSeqNo + + "], from_seqno [" + + fromSeqNo + + "], to_seqno [" + + toSeqNo + + "], op [" + + op + + "]"; + lastSeenSeqNo = op.seqNo(); + } + return op; + } + + @Override + public void close() throws IOException { + onClose.close(); + } + + /** + * Retrieves the next batch of top documents based on the sequence range. + * + * @return TopDocs instance containing the documents in the current batch. + */ + protected TopDocs nextTopDocs() throws IOException { + Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated); + SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG); + + TopFieldCollectorManager collectorManager = new TopFieldCollectorManager( + new Sort(sortBySeqNo), + searchBatchSize, + afterDoc, + 0, + false + ); + TopDocs results = indexSearcher.search(rangeQuery, collectorManager); + + if (results.scoreDocs.length > 0) { + afterDoc = (FieldDoc) results.scoreDocs[results.scoreDocs.length - 1]; + } + for (int i = 0; i < results.scoreDocs.length; i++) { + results.scoreDocs[i].shardIndex = i; + } + return results; + } + + static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { + return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + } + + static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) { + return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) + .build(); + } + + private void verifyRange(Translog.Operation op) { + if (op == null && lastSeenSeqNo < toSeqNo) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; prematurely terminated last_seen_seqno [" + + lastSeenSeqNo + + "]" + ); + } else if (op != null && op.seqNo() != lastSeenSeqNo + 1) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; expected seqno [" + + lastSeenSeqNo + + 1 + + "]; found [" + + op + + "]" + ); + } + } + + protected static boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { + NumericDocValues docValues = leafReader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); + if (docValues == null || docValues.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETES_FIELD + "] is not found"); + } + return docValues.longValue() == 1; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/LeafStoredFieldLoader.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/LeafStoredFieldLoader.java index 11cf56e2304a7..3ed4c856ccc71 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/LeafStoredFieldLoader.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/LeafStoredFieldLoader.java @@ -9,7 +9,6 @@ package org.elasticsearch.index.fieldvisitor; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; @@ -48,6 +47,4 @@ public interface LeafStoredFieldLoader { * @return stored fields for the current document */ Map> storedFields(); - - CheckedBiConsumer reader(); } diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/StoredFieldLoader.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/StoredFieldLoader.java index 25a42edb98c0f..d41f65fd68fc2 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/StoredFieldLoader.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/StoredFieldLoader.java @@ -185,12 +185,6 @@ public String routing() { public Map> storedFields() { return Collections.emptyMap(); } - - @Override - public CheckedBiConsumer reader() { - // do nothing - return (a, b) -> {}; - } } private static class ReaderStoredFieldLoader implements LeafStoredFieldLoader { @@ -232,11 +226,6 @@ public String routing() { public Map> storedFields() { return visitor.fields(); } - - @Override - public CheckedBiConsumer reader() { - return reader; - } } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SortedNumericDocValuesSyntheticFieldLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/SortedNumericDocValuesSyntheticFieldLoader.java index b06d5bac9da89..c3660061d13f4 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SortedNumericDocValuesSyntheticFieldLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SortedNumericDocValuesSyntheticFieldLoader.java @@ -16,7 +16,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Arrays; import java.util.Map; import java.util.stream.Stream; diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 372e0bbdfecf4..e28e16a7f4aec 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -58,6 +58,8 @@ public class SourceFieldMapper extends MetadataFieldMapper { public static final String NAME = "_source"; public static final String RECOVERY_SOURCE_NAME = "_recovery_source"; + public static final String RECOVERY_SOURCE_SIZE_NAME = "_recovery_source_size"; + public static final String CONTENT_TYPE = "_source"; public static final String LOSSY_PARAMETERS_ALLOWED_SETTING_NAME = "index.lossy.source-mapping-parameters"; @@ -393,8 +395,18 @@ public void preParse(DocumentParserContext context) throws IOException { if (enableRecoverySource && originalSource != null && adaptedSource != originalSource) { // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery BytesRef ref = originalSource.toBytesRef(); - context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); - context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); + if (isSynthetic() && context.indexSettings().isRecoverySourceSyntheticEnabled()) { + /** + * We use synthetic source for recovery, so we omit the recovery source. + * Instead, we record only the size of the uncompressed source. + * This size is used in {@link LuceneSyntheticSourceChangesSnapshot} to control memory + * usage during the recovery process when loading a batch of synthetic sources. + */ + context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_SIZE_NAME, ref.length)); + } else { + context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); + context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 1ec187ea4a34b..567ea19e46f2f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -397,6 +397,13 @@ public Iterator> settings() { Property.NodeScope ); + public static final Setting INDICES_RECOVERY_SOURCE_SYNTHETIC_LOGS_ENABLED_SETTING = Setting.boolSetting( + "index.recovery.recovery_source.synthetic.logs.enabled", + true, + Property.Dynamic, + Property.NodeScope + ); + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue maxBytesPerSec; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchProfiler.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchProfiler.java index 1f75d2dc4cd29..5e7526fe9b13f 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchProfiler.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchProfiler.java @@ -10,9 +10,7 @@ package org.elasticsearch.search.fetch; import org.apache.lucene.index.LeafReaderContext; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; import org.elasticsearch.search.fetch.FetchSubPhase.HitContext; @@ -100,11 +98,6 @@ public String routing() { public Map> storedFields() { return in.storedFields(); } - - @Override - public CheckedBiConsumer reader() { - return in.reader(); - } }; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bba1fa338559f..edf15cd6d6ac9 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3461,7 +3461,7 @@ public void testTranslogReplay() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(mapperService); engine.close(); // we need to reuse the engine config unless the parser.mappingModified won't work @@ -3473,7 +3473,7 @@ public void testTranslogReplay() throws IOException { assertEquals(numDocs, translogHandler.appliedOperations()); engine.close(); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(mapperService); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); @@ -3527,7 +3527,7 @@ public void testTranslogReplay() throws IOException { } engine.close(); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(mapperService); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); engine.refresh("warm_up"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 85ba368165ceb..d78f0bed647ca 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -10,289 +10,36 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.NoMergePolicy; -import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; - -public class LuceneChangesSnapshotTests extends EngineTestCase { +public class LuceneChangesSnapshotTests extends SearchBasedChangesSnapshotTests { @Override - protected Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) // always enable soft-deletes - .build(); - } - - public void testBasics() throws Exception { - long fromSeqNo = randomNonNegativeLong(); - long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); - // Empty engine - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { - IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); - assertThat( - error.getMessage(), - containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") - ); - } - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean(), randomBoolean())) { - assertThat(snapshot, SnapshotMatchers.size(0)); - } - int numOps = between(1, 100); - int refreshedSeqNo = -1; - for (int i = 0; i < numOps; i++) { - String id = Integer.toString(randomIntBetween(i, i + 5)); - ParsedDocument doc = createParsedDoc(id, null, randomBoolean()); - if (randomBoolean()) { - engine.index(indexForDoc(doc)); - } else { - engine.delete(new Engine.Delete(doc.id(), Uid.encodeId(doc.id()), primaryTerm.get())); - } - if (rarely()) { - if (randomBoolean()) { - engine.flush(); - } else { - engine.refresh("test"); - } - refreshedSeqNo = i; - } - } - if (refreshedSeqNo == -1) { - fromSeqNo = between(0, numOps); - toSeqNo = randomLongBetween(fromSeqNo, numOps * 2); - - Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), - fromSeqNo, - toSeqNo, - false, - randomBoolean(), - randomBoolean(), - IndexVersion.current() - ) - ) { - searcher = null; - assertThat(snapshot, SnapshotMatchers.size(0)); - } finally { - IOUtils.close(searcher); - } - - searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), - fromSeqNo, - toSeqNo, - true, - randomBoolean(), - randomBoolean(), - IndexVersion.current() - ) - ) { - searcher = null; - IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); - assertThat( - error.getMessage(), - containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") - ); - } finally { - IOUtils.close(searcher); - } - } else { - fromSeqNo = randomLongBetween(0, refreshedSeqNo); - toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); - Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), - fromSeqNo, - toSeqNo, - false, - randomBoolean(), - randomBoolean(), - IndexVersion.current() - ) - ) { - searcher = null; - assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); - } finally { - IOUtils.close(searcher); - } - searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), - fromSeqNo, - toSeqNo, - true, - randomBoolean(), - randomBoolean(), - IndexVersion.current() - ) - ) { - searcher = null; - IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); - assertThat( - error.getMessage(), - containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") - ); - } finally { - IOUtils.close(searcher); - } - toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo); - searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), - fromSeqNo, - toSeqNo, - true, - randomBoolean(), - randomBoolean(), - IndexVersion.current() - ) - ) { - searcher = null; - assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); - } finally { - IOUtils.close(searcher); - } - } - // Get snapshot via engine will auto refresh - fromSeqNo = randomLongBetween(0, numOps - 1); - toSeqNo = randomLongBetween(fromSeqNo, numOps - 1); - try ( - Translog.Snapshot snapshot = engine.newChangesSnapshot( - "test", - fromSeqNo, - toSeqNo, - randomBoolean(), - randomBoolean(), - randomBoolean() - ) - ) { - assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); - } - } - - /** - * A nested document is indexed into Lucene as multiple documents. While the root document has both sequence number and primary term, - * non-root documents don't have primary term but only sequence numbers. This test verifies that {@link LuceneChangesSnapshot} - * correctly skip non-root documents and returns at most one operation per sequence number. - */ - public void testSkipNonRootOfNestedDocuments() throws Exception { - Map seqNoToTerm = new HashMap<>(); - List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); - for (Engine.Operation op : operations) { - if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { - seqNoToTerm.put(op.seqNo(), op.primaryTerm()); - } - applyOperation(engine, op); - if (rarely()) { - engine.refresh("test"); - } - if (rarely()) { - engine.rollTranslogGeneration(); - } - if (rarely()) { - engine.flush(); - } - } - long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); - engine.refresh("test"); - Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - final boolean accessStats = randomBoolean(); - try ( - Translog.Snapshot snapshot = new LuceneChangesSnapshot( - searcher, - between(1, 100), - 0, - maxSeqNo, - false, - randomBoolean(), - accessStats, - IndexVersion.current() - ) - ) { - if (accessStats) { - assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); - } - Translog.Operation op; - while ((op = snapshot.next()) != null) { - assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); - } - assertThat(snapshot.skippedOperations(), equalTo(0)); - } - } - - public void testUpdateAndReadChangesConcurrently() throws Exception { - Follower[] followers = new Follower[between(1, 3)]; - CountDownLatch readyLatch = new CountDownLatch(followers.length + 1); - AtomicBoolean isDone = new AtomicBoolean(); - for (int i = 0; i < followers.length; i++) { - followers[i] = new Follower(engine, isDone, readyLatch); - followers[i].start(); - } - boolean onPrimary = randomBoolean(); - List operations = new ArrayList<>(); - int numOps = frequently() ? scaledRandomIntBetween(1, 1500) : scaledRandomIntBetween(5000, 20_000); - for (int i = 0; i < numOps; i++) { - String id = Integer.toString(randomIntBetween(0, randomBoolean() ? 10 : numOps * 2)); - ParsedDocument doc = createParsedDoc(id, randomAlphaOfLengthBetween(1, 5), randomBoolean()); - final Engine.Operation op; - if (onPrimary) { - if (randomBoolean()) { - op = new Engine.Index(newUid(doc), primaryTerm.get(), doc); - } else { - op = new Engine.Delete(doc.id(), Uid.encodeId(doc.id()), primaryTerm.get()); - } - } else { - if (randomBoolean()) { - op = replicaIndexForDoc(doc, randomNonNegativeLong(), i, randomBoolean()); - } else { - op = replicaDeleteForDoc(doc.id(), randomNonNegativeLong(), i, randomNonNegativeLong()); - } - } - operations.add(op); - } - readyLatch.countDown(); - readyLatch.await(); - Randomness.shuffle(operations); - concurrentlyApplyOps(operations, engine); - assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), equalTo(operations.size() - 1L)); - isDone.set(true); - for (Follower follower : followers) { - follower.join(); - IOUtils.close(follower.engine, follower.engine.store); - } + protected Translog.Snapshot newRandomSnapshot( + MappingLookup mappingLookup, + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean singleConsumer, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + return new LuceneChangesSnapshot( + engineSearcher, + searchBatchSize, + fromSeqNo, + toSeqNo, + requiredFullRange, + singleConsumer, + accessStats, + indexVersionCreated + ); } public void testAccessStoredFieldsSequentially() throws Exception { @@ -377,165 +124,4 @@ public void testAccessStoredFieldsSequentially() throws Exception { } } } - - class Follower extends Thread { - private final InternalEngine leader; - private final InternalEngine engine; - private final TranslogHandler translogHandler; - private final AtomicBoolean isDone; - private final CountDownLatch readLatch; - - Follower(InternalEngine leader, AtomicBoolean isDone, CountDownLatch readLatch) throws IOException { - this.leader = leader; - this.isDone = isDone; - this.readLatch = readLatch; - this.translogHandler = new TranslogHandler( - xContentRegistry(), - IndexSettingsModule.newIndexSettings(shardId.getIndexName(), leader.engineConfig.getIndexSettings().getSettings()) - ); - this.engine = createEngine(createStore(), createTempDir()); - } - - void pullOperations(InternalEngine follower) throws IOException { - long leaderCheckpoint = leader.getLocalCheckpointTracker().getProcessedCheckpoint(); - long followerCheckpoint = follower.getLocalCheckpointTracker().getProcessedCheckpoint(); - if (followerCheckpoint < leaderCheckpoint) { - long fromSeqNo = followerCheckpoint + 1; - long batchSize = randomLongBetween(0, 100); - long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); - try ( - Translog.Snapshot snapshot = leader.newChangesSnapshot( - "test", - fromSeqNo, - toSeqNo, - true, - randomBoolean(), - randomBoolean() - ) - ) { - translogHandler.run(follower, snapshot); - } - } - } - - @Override - public void run() { - try { - readLatch.countDown(); - readLatch.await(); - while (isDone.get() == false - || engine.getLocalCheckpointTracker().getProcessedCheckpoint() < leader.getLocalCheckpointTracker() - .getProcessedCheckpoint()) { - pullOperations(engine); - } - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); - // have to verify without source since we are randomly testing without _source - List docsWithoutSourceOnFollower = getDocIds(engine, true).stream() - .map(d -> new DocIdSeqNoAndSource(d.id(), null, d.seqNo(), d.primaryTerm(), d.version())) - .toList(); - List docsWithoutSourceOnLeader = getDocIds(leader, true).stream() - .map(d -> new DocIdSeqNoAndSource(d.id(), null, d.seqNo(), d.primaryTerm(), d.version())) - .toList(); - assertThat(docsWithoutSourceOnFollower, equalTo(docsWithoutSourceOnLeader)); - } catch (Exception ex) { - throw new AssertionError(ex); - } - } - } - - private List drainAll(Translog.Snapshot snapshot) throws IOException { - List operations = new ArrayList<>(); - Translog.Operation op; - while ((op = snapshot.next()) != null) { - final Translog.Operation newOp = op; - logger.trace("Reading [{}]", op); - assert operations.stream().allMatch(o -> o.seqNo() < newOp.seqNo()) : "Operations [" + operations + "], op [" + op + "]"; - operations.add(newOp); - } - return operations; - } - - public void testOverFlow() throws Exception { - long fromSeqNo = randomLongBetween(0, 5); - long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { - IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); - assertThat( - error.getMessage(), - containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") - ); - } - } - - public void testStats() throws Exception { - try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { - int numOps = between(100, 5000); - long startingSeqNo = randomLongBetween(0, Integer.MAX_VALUE); - List operations = generateHistoryOnReplica( - numOps, - startingSeqNo, - randomBoolean(), - randomBoolean(), - randomBoolean() - ); - applyOperations(engine, operations); - - LongSupplier fromSeqNo = () -> { - if (randomBoolean()) { - return 0L; - } else if (randomBoolean()) { - return startingSeqNo; - } else { - return randomLongBetween(0, startingSeqNo); - } - }; - - LongSupplier toSeqNo = () -> { - final long maxSeqNo = engine.getSeqNoStats(-1).getMaxSeqNo(); - if (randomBoolean()) { - return maxSeqNo; - } else if (randomBoolean()) { - return Long.MAX_VALUE; - } else { - return randomLongBetween(maxSeqNo, Long.MAX_VALUE); - } - }; - // Can't access stats if didn't request it - try ( - Translog.Snapshot snapshot = engine.newChangesSnapshot( - "test", - fromSeqNo.getAsLong(), - toSeqNo.getAsLong(), - false, - randomBoolean(), - false - ) - ) { - IllegalStateException error = expectThrows(IllegalStateException.class, snapshot::totalOperations); - assertThat(error.getMessage(), equalTo("Access stats of a snapshot created with [access_stats] is false")); - final List translogOps = drainAll(snapshot); - assertThat(translogOps, hasSize(numOps)); - error = expectThrows(IllegalStateException.class, snapshot::totalOperations); - assertThat(error.getMessage(), equalTo("Access stats of a snapshot created with [access_stats] is false")); - } - // Access stats and operations - try ( - Translog.Snapshot snapshot = engine.newChangesSnapshot( - "test", - fromSeqNo.getAsLong(), - toSeqNo.getAsLong(), - false, - randomBoolean(), - true - ) - ) { - assertThat(snapshot.totalOperations(), equalTo(numOps)); - final List translogOps = drainAll(snapshot); - assertThat(translogOps, hasSize(numOps)); - assertThat(snapshot.totalOperations(), equalTo(numOps)); - } - // Verify count - assertThat(engine.countChanges("test", fromSeqNo.getAsLong(), toSeqNo.getAsLong()), equalTo(numOps)); - } - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshotTests.java new file mode 100644 index 0000000000000..d2f2de2ba8fc5 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshotTests.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; + +import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING; + +public class LuceneSyntheticSourceChangesSnapshotTests extends SearchBasedChangesSnapshotTests { + @Override + protected Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), SourceFieldMapper.Mode.SYNTHETIC.name()) + .put(IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true) + .build(); + } + + @Override + protected Translog.Snapshot newRandomSnapshot( + MappingLookup mappingLookup, + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean singleConsumer, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + return new LuceneSyntheticSourceChangesSnapshot( + mappingLookup, + engineSearcher, + searchBatchSize, + randomLongBetween(1, LuceneSyntheticSourceChangesSnapshot.DEFAULT_MEMORY_SIZE), + fromSeqNo, + toSeqNo, + requiredFullRange, + accessStats, + indexVersionCreated + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java index c0e365909429a..fdb9f4dea8414 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java @@ -39,83 +39,99 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + public class RecoverySourcePruneMergePolicyTests extends ESTestCase { public void testPruneAll() throws IOException { - try (Directory dir = newDirectory()) { - boolean pruneIdField = randomBoolean(); - IndexWriterConfig iwc = newIndexWriterConfig(); - RecoverySourcePruneMergePolicy mp = new RecoverySourcePruneMergePolicy( - "extra_source", - pruneIdField, - MatchNoDocsQuery::new, - newLogMergePolicy() - ); - iwc.setMergePolicy(new ShuffleForcedMergePolicy(mp)); - try (IndexWriter writer = new IndexWriter(dir, iwc)) { - for (int i = 0; i < 20; i++) { - if (i > 0 && randomBoolean()) { - writer.flush(); - } - Document doc = new Document(); - doc.add(new StoredField(IdFieldMapper.NAME, "_id")); - doc.add(new StoredField("source", "hello world")); - doc.add(new StoredField("extra_source", "hello world")); - doc.add(new NumericDocValuesField("extra_source", 1)); - writer.addDocument(doc); - } - writer.forceMerge(1); - writer.commit(); - try (DirectoryReader reader = DirectoryReader.open(writer)) { - StoredFields storedFields = reader.storedFields(); - for (int i = 0; i < reader.maxDoc(); i++) { - Document document = storedFields.document(i); - if (pruneIdField) { - assertEquals(1, document.getFields().size()); - assertEquals("source", document.getFields().get(0).name()); - } else { - assertEquals(2, document.getFields().size()); - assertEquals(IdFieldMapper.NAME, document.getFields().get(0).name()); - assertEquals("source", document.getFields().get(1).name()); + for (boolean pruneIdField : List.of(true, false)) { + for (boolean syntheticRecoverySource : List.of(true, false)) { + try (Directory dir = newDirectory()) { + IndexWriterConfig iwc = newIndexWriterConfig(); + RecoverySourcePruneMergePolicy mp = new RecoverySourcePruneMergePolicy( + syntheticRecoverySource ? null : "extra_source", + "extra_source_size", + pruneIdField, + MatchNoDocsQuery::new, + newLogMergePolicy() + ); + iwc.setMergePolicy(new ShuffleForcedMergePolicy(mp)); + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < 20; i++) { + if (i > 0 && randomBoolean()) { + writer.flush(); + } + Document doc = new Document(); + doc.add(new StoredField(IdFieldMapper.NAME, "_id")); + doc.add(new StoredField("source", "hello world")); + if (syntheticRecoverySource) { + doc.add(new NumericDocValuesField("extra_source_size", randomIntBetween(10, 10000))); + } else { + doc.add(new StoredField("extra_source", "hello world")); + doc.add(new NumericDocValuesField("extra_source", 1)); + } + writer.addDocument(doc); } - } - assertEquals(1, reader.leaves().size()); - LeafReader leafReader = reader.leaves().get(0).reader(); - NumericDocValues extra_source = leafReader.getNumericDocValues("extra_source"); - if (extra_source != null) { - assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); - } - if (leafReader instanceof CodecReader codecReader && reader instanceof StandardDirectoryReader sdr) { - SegmentInfos segmentInfos = sdr.getSegmentInfos(); - MergePolicy.MergeSpecification forcedMerges = mp.findForcedDeletesMerges( - segmentInfos, - new MergePolicy.MergeContext() { - @Override - public int numDeletesToMerge(SegmentCommitInfo info) { - return info.info.maxDoc() - 1; + writer.forceMerge(1); + writer.commit(); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + StoredFields storedFields = reader.storedFields(); + for (int i = 0; i < reader.maxDoc(); i++) { + Document document = storedFields.document(i); + if (pruneIdField) { + assertEquals(1, document.getFields().size()); + assertEquals("source", document.getFields().get(0).name()); + } else { + assertEquals(2, document.getFields().size()); + assertEquals(IdFieldMapper.NAME, document.getFields().get(0).name()); + assertEquals("source", document.getFields().get(1).name()); } + } - @Override - public int numDeletedDocs(SegmentCommitInfo info) { - return info.info.maxDoc() - 1; - } + assertEquals(1, reader.leaves().size()); + LeafReader leafReader = reader.leaves().get(0).reader(); - @Override - public InfoStream getInfoStream() { - return new NullInfoStream(); - } + NumericDocValues extra_source = leafReader.getNumericDocValues( + syntheticRecoverySource ? "extra_source_size" : "extra_source" + ); + if (extra_source != null) { + assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); + } + if (leafReader instanceof CodecReader codecReader && reader instanceof StandardDirectoryReader sdr) { + SegmentInfos segmentInfos = sdr.getSegmentInfos(); + MergePolicy.MergeSpecification forcedMerges = mp.findForcedDeletesMerges( + segmentInfos, + new MergePolicy.MergeContext() { + @Override + public int numDeletesToMerge(SegmentCommitInfo info) { + return info.info.maxDoc() - 1; + } - @Override - public Set getMergingSegments() { - return Collections.emptySet(); - } + @Override + public int numDeletedDocs(SegmentCommitInfo info) { + return info.info.maxDoc() - 1; + } + + @Override + public InfoStream getInfoStream() { + return new NullInfoStream(); + } + + @Override + public Set getMergingSegments() { + return Collections.emptySet(); + } + } + ); + // don't wrap if there is nothing to do + assertSame(codecReader, forcedMerges.merges.get(0).wrapForMerge(codecReader)); } - ); - // don't wrap if there is nothing to do - assertSame(codecReader, forcedMerges.merges.get(0).wrapForMerge(codecReader)); + } } } } @@ -123,87 +139,126 @@ public Set getMergingSegments() { } public void testPruneSome() throws IOException { - try (Directory dir = newDirectory()) { - boolean pruneIdField = randomBoolean(); - IndexWriterConfig iwc = newIndexWriterConfig(); - iwc.setMergePolicy( - new RecoverySourcePruneMergePolicy( - "extra_source", - pruneIdField, - () -> new TermQuery(new Term("even", "true")), - iwc.getMergePolicy() - ) - ); - try (IndexWriter writer = new IndexWriter(dir, iwc)) { - for (int i = 0; i < 20; i++) { - if (i > 0 && randomBoolean()) { - writer.flush(); - } - Document doc = new Document(); - doc.add(new StoredField(IdFieldMapper.NAME, "_id")); - doc.add(new StringField("even", Boolean.toString(i % 2 == 0), Field.Store.YES)); - doc.add(new StoredField("source", "hello world")); - doc.add(new StoredField("extra_source", "hello world")); - doc.add(new NumericDocValuesField("extra_source", 1)); - writer.addDocument(doc); - } - writer.forceMerge(1); - writer.commit(); - try (DirectoryReader reader = DirectoryReader.open(writer)) { - assertEquals(1, reader.leaves().size()); - NumericDocValues extra_source = reader.leaves().get(0).reader().getNumericDocValues("extra_source"); - assertNotNull(extra_source); - StoredFields storedFields = reader.storedFields(); - for (int i = 0; i < reader.maxDoc(); i++) { - Document document = storedFields.document(i); - Set collect = document.getFields().stream().map(IndexableField::name).collect(Collectors.toSet()); - assertTrue(collect.contains("source")); - assertTrue(collect.contains("even")); - if (collect.size() == 4) { - assertTrue(collect.contains("extra_source")); - assertTrue(collect.contains(IdFieldMapper.NAME)); - assertEquals("true", document.getField("even").stringValue()); - assertEquals(i, extra_source.nextDoc()); - } else { - assertEquals(pruneIdField ? 2 : 3, document.getFields().size()); + for (boolean pruneIdField : List.of(true, false)) { + for (boolean syntheticRecoverySource : List.of(true, false)) { + try (Directory dir = newDirectory()) { + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setMergePolicy( + new RecoverySourcePruneMergePolicy( + syntheticRecoverySource ? null : "extra_source", + "extra_source_size", + pruneIdField, + () -> new TermQuery(new Term("even", "true")), + iwc.getMergePolicy() + ) + ); + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < 20; i++) { + if (i > 0 && randomBoolean()) { + writer.flush(); + } + Document doc = new Document(); + doc.add(new StoredField(IdFieldMapper.NAME, "_id")); + doc.add(new StringField("even", Boolean.toString(i % 2 == 0), Field.Store.YES)); + doc.add(new StoredField("source", "hello world")); + if (syntheticRecoverySource) { + doc.add(new NumericDocValuesField("extra_source_size", randomIntBetween(10, 10000))); + } else { + doc.add(new StoredField("extra_source", "hello world")); + doc.add(new NumericDocValuesField("extra_source", 1)); + } + writer.addDocument(doc); + } + writer.forceMerge(1); + writer.commit(); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertEquals(1, reader.leaves().size()); + String extraSourceDVName = syntheticRecoverySource ? "extra_source_size" : "extra_source"; + NumericDocValues extra_source = reader.leaves().get(0).reader().getNumericDocValues(extraSourceDVName); + assertNotNull(extra_source); + StoredFields storedFields = reader.storedFields(); + for (int i = 0; i < reader.maxDoc(); i++) { + Document document = storedFields.document(i); + Set collect = document.getFields().stream().map(IndexableField::name).collect(Collectors.toSet()); + assertTrue(collect.contains("source")); + assertTrue(collect.contains("even")); + boolean isEven = Boolean.parseBoolean(document.getField("even").stringValue()); + if (isEven) { + assertTrue(collect.contains(IdFieldMapper.NAME)); + assertThat(collect.contains("extra_source"), equalTo(syntheticRecoverySource == false)); + if (extra_source.docID() < i) { + extra_source.advance(i); + } + assertEquals(i, extra_source.docID()); + if (syntheticRecoverySource) { + assertThat(extra_source.longValue(), greaterThan(10L)); + } else { + assertThat(extra_source.longValue(), equalTo(1L)); + } + } else { + assertThat(collect.contains(IdFieldMapper.NAME), equalTo(pruneIdField == false)); + assertFalse(collect.contains("extra_source")); + if (extra_source.docID() < i) { + extra_source.advance(i); + } + assertNotEquals(i, extra_source.docID()); + } + } + if (extra_source.docID() != DocIdSetIterator.NO_MORE_DOCS) { + assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); + } } } - assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); } } } } public void testPruneNone() throws IOException { - try (Directory dir = newDirectory()) { - IndexWriterConfig iwc = newIndexWriterConfig(); - iwc.setMergePolicy(new RecoverySourcePruneMergePolicy("extra_source", false, MatchAllDocsQuery::new, iwc.getMergePolicy())); - try (IndexWriter writer = new IndexWriter(dir, iwc)) { - for (int i = 0; i < 20; i++) { - if (i > 0 && randomBoolean()) { - writer.flush(); + for (boolean syntheticRecoverySource : List.of(true, false)) { + try (Directory dir = newDirectory()) { + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setMergePolicy( + new RecoverySourcePruneMergePolicy( + syntheticRecoverySource ? null : "extra_source", + "extra_source_size", + false, + MatchAllDocsQuery::new, + iwc.getMergePolicy() + ) + ); + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < 20; i++) { + if (i > 0 && randomBoolean()) { + writer.flush(); + } + Document doc = new Document(); + doc.add(new StoredField("source", "hello world")); + if (syntheticRecoverySource) { + doc.add(new NumericDocValuesField("extra_source_size", randomIntBetween(10, 10000))); + } else { + doc.add(new StoredField("extra_source", "hello world")); + doc.add(new NumericDocValuesField("extra_source", 1)); + } + writer.addDocument(doc); } - Document doc = new Document(); - doc.add(new StoredField("source", "hello world")); - doc.add(new StoredField("extra_source", "hello world")); - doc.add(new NumericDocValuesField("extra_source", 1)); - writer.addDocument(doc); - } - writer.forceMerge(1); - writer.commit(); - try (DirectoryReader reader = DirectoryReader.open(writer)) { - assertEquals(1, reader.leaves().size()); - NumericDocValues extra_source = reader.leaves().get(0).reader().getNumericDocValues("extra_source"); - assertNotNull(extra_source); - StoredFields storedFields = reader.storedFields(); - for (int i = 0; i < reader.maxDoc(); i++) { - Document document = storedFields.document(i); - Set collect = document.getFields().stream().map(IndexableField::name).collect(Collectors.toSet()); - assertTrue(collect.contains("source")); - assertTrue(collect.contains("extra_source")); - assertEquals(i, extra_source.nextDoc()); + writer.forceMerge(1); + writer.commit(); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertEquals(1, reader.leaves().size()); + String extraSourceDVName = syntheticRecoverySource ? "extra_source_size" : "extra_source"; + NumericDocValues extra_source = reader.leaves().get(0).reader().getNumericDocValues(extraSourceDVName); + assertNotNull(extra_source); + StoredFields storedFields = reader.storedFields(); + for (int i = 0; i < reader.maxDoc(); i++) { + Document document = storedFields.document(i); + Set collect = document.getFields().stream().map(IndexableField::name).collect(Collectors.toSet()); + assertTrue(collect.contains("source")); + assertThat(collect.contains("extra_source"), equalTo(syntheticRecoverySource == false)); + assertEquals(i, extra_source.nextDoc()); + } + assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); } - assertEquals(DocIdSetIterator.NO_MORE_DOCS, extra_source.nextDoc()); } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java new file mode 100644 index 0000000000000..9db6f224538b7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java @@ -0,0 +1,472 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public abstract class SearchBasedChangesSnapshotTests extends EngineTestCase { + @Override + protected Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) // always enable soft-deletes + .build(); + } + + protected abstract Translog.Snapshot newRandomSnapshot( + MappingLookup mappingLookup, + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean singleConsumer, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException; + + public void testBasics() throws Exception { + long fromSeqNo = randomNonNegativeLong(); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + // Empty engine + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat( + error.getMessage(), + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") + ); + } + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean(), randomBoolean())) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } + int numOps = between(1, 100); + int refreshedSeqNo = -1; + for (int i = 0; i < numOps; i++) { + String id = Integer.toString(randomIntBetween(i, i + 5)); + ParsedDocument doc = parseDocument(engine.engineConfig.getMapperService(), id, null); + if (randomBoolean()) { + engine.index(indexForDoc(doc)); + } else { + engine.delete(new Engine.Delete(doc.id(), Uid.encodeId(doc.id()), primaryTerm.get())); + } + if (rarely()) { + if (randomBoolean()) { + engine.flush(); + } else { + engine.refresh("test"); + } + refreshedSeqNo = i; + } + } + if (refreshedSeqNo == -1) { + fromSeqNo = between(0, numOps); + toSeqNo = randomLongBetween(fromSeqNo, numOps * 2); + + Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE), + fromSeqNo, + toSeqNo, + false, + randomBoolean(), + randomBoolean(), + IndexVersion.current() + ) + ) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.size(0)); + } finally { + IOUtils.close(searcher); + } + + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE), + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + IndexVersion.current() + ) + ) { + searcher = null; + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat( + error.getMessage(), + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") + ); + } finally { + IOUtils.close(searcher); + } + } else { + fromSeqNo = randomLongBetween(0, refreshedSeqNo); + toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); + Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE), + fromSeqNo, + toSeqNo, + false, + randomBoolean(), + randomBoolean(), + IndexVersion.current() + ) + ) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); + } finally { + IOUtils.close(searcher); + } + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE), + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + IndexVersion.current() + ) + ) { + searcher = null; + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat( + error.getMessage(), + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") + ); + } finally { + IOUtils.close(searcher); + } + toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo); + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE), + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + IndexVersion.current() + ) + ) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); + } finally { + IOUtils.close(searcher); + } + } + // Get snapshot via engine will auto refresh + fromSeqNo = randomLongBetween(0, numOps - 1); + toSeqNo = randomLongBetween(fromSeqNo, numOps - 1); + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + randomBoolean(), + randomBoolean(), + randomBoolean() + ) + ) { + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); + } + } + + /** + * A nested document is indexed into Lucene as multiple documents. While the root document has both sequence number and primary term, + * non-root documents don't have primary term but only sequence numbers. This test verifies that {@link LuceneChangesSnapshot} + * correctly skip non-root documents and returns at most one operation per sequence number. + */ + public void testSkipNonRootOfNestedDocuments() throws Exception { + Map seqNoToTerm = new HashMap<>(); + List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); + for (Engine.Operation op : operations) { + if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { + seqNoToTerm.put(op.seqNo(), op.primaryTerm()); + } + applyOperation(engine, op); + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.rollTranslogGeneration(); + } + if (rarely()) { + engine.flush(); + } + } + long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + engine.refresh("test"); + Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + final boolean accessStats = randomBoolean(); + try ( + Translog.Snapshot snapshot = newRandomSnapshot( + engine.engineConfig.getMapperService().mappingLookup(), + searcher, + between(1, 100), + 0, + maxSeqNo, + false, + randomBoolean(), + accessStats, + IndexVersion.current() + ) + ) { + if (accessStats) { + assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); + } + Translog.Operation op; + while ((op = snapshot.next()) != null) { + assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); + } + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + } + + public void testUpdateAndReadChangesConcurrently() throws Exception { + Follower[] followers = new Follower[between(1, 3)]; + CountDownLatch readyLatch = new CountDownLatch(followers.length + 1); + AtomicBoolean isDone = new AtomicBoolean(); + for (int i = 0; i < followers.length; i++) { + followers[i] = new Follower(engine, isDone, readyLatch); + followers[i].start(); + } + boolean onPrimary = randomBoolean(); + List operations = new ArrayList<>(); + int numOps = frequently() ? scaledRandomIntBetween(1, 1500) : scaledRandomIntBetween(5000, 20_000); + for (int i = 0; i < numOps; i++) { + String id = Integer.toString(randomIntBetween(0, randomBoolean() ? 10 : numOps * 2)); + ParsedDocument doc = parseDocument(engine.engineConfig.getMapperService(), id, randomAlphaOfLengthBetween(1, 5)); + final Engine.Operation op; + if (onPrimary) { + if (randomBoolean()) { + op = new Engine.Index(newUid(doc), primaryTerm.get(), doc); + } else { + op = new Engine.Delete(doc.id(), Uid.encodeId(doc.id()), primaryTerm.get()); + } + } else { + if (randomBoolean()) { + op = replicaIndexForDoc(doc, randomNonNegativeLong(), i, randomBoolean()); + } else { + op = replicaDeleteForDoc(doc.id(), randomNonNegativeLong(), i, randomNonNegativeLong()); + } + } + operations.add(op); + } + readyLatch.countDown(); + readyLatch.await(); + Randomness.shuffle(operations); + concurrentlyApplyOps(operations, engine); + assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), equalTo(operations.size() - 1L)); + isDone.set(true); + for (Follower follower : followers) { + follower.join(); + IOUtils.close(follower.engine, follower.engine.store); + } + } + + class Follower extends Thread { + private final InternalEngine leader; + private final InternalEngine engine; + private final TranslogHandler translogHandler; + private final AtomicBoolean isDone; + private final CountDownLatch readLatch; + + Follower(InternalEngine leader, AtomicBoolean isDone, CountDownLatch readLatch) throws IOException { + this.leader = leader; + this.isDone = isDone; + this.readLatch = readLatch; + this.engine = createEngine(defaultSettings, createStore(), createTempDir(), newMergePolicy()); + this.translogHandler = new TranslogHandler(engine.engineConfig.getMapperService()); + } + + void pullOperations(InternalEngine follower) throws IOException { + long leaderCheckpoint = leader.getLocalCheckpointTracker().getProcessedCheckpoint(); + long followerCheckpoint = follower.getLocalCheckpointTracker().getProcessedCheckpoint(); + if (followerCheckpoint < leaderCheckpoint) { + long fromSeqNo = followerCheckpoint + 1; + long batchSize = randomLongBetween(0, 100); + long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); + try ( + Translog.Snapshot snapshot = leader.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean() + ) + ) { + translogHandler.run(follower, snapshot); + } + } + } + + @Override + public void run() { + try { + readLatch.countDown(); + readLatch.await(); + while (isDone.get() == false + || engine.getLocalCheckpointTracker().getProcessedCheckpoint() < leader.getLocalCheckpointTracker() + .getProcessedCheckpoint()) { + pullOperations(engine); + } + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); + // have to verify without source since we are randomly testing without _source + List docsWithoutSourceOnFollower = getDocIds(engine, true).stream() + .map(d -> new DocIdSeqNoAndSource(d.id(), null, d.seqNo(), d.primaryTerm(), d.version())) + .toList(); + List docsWithoutSourceOnLeader = getDocIds(leader, true).stream() + .map(d -> new DocIdSeqNoAndSource(d.id(), null, d.seqNo(), d.primaryTerm(), d.version())) + .toList(); + assertThat(docsWithoutSourceOnFollower, equalTo(docsWithoutSourceOnLeader)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + } + + private List drainAll(Translog.Snapshot snapshot) throws IOException { + List operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + final Translog.Operation newOp = op; + logger.trace("Reading [{}]", op); + assert operations.stream().allMatch(o -> o.seqNo() < newOp.seqNo()) : "Operations [" + operations + "], op [" + op + "]"; + operations.add(newOp); + } + return operations; + } + + public void testOverFlow() throws Exception { + long fromSeqNo = randomLongBetween(0, 5); + long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE); + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat( + error.getMessage(), + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") + ); + } + } + + public void testStats() throws Exception { + try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + int numOps = between(100, 5000); + long startingSeqNo = randomLongBetween(0, Integer.MAX_VALUE); + List operations = generateHistoryOnReplica( + numOps, + startingSeqNo, + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + applyOperations(engine, operations); + + LongSupplier fromSeqNo = () -> { + if (randomBoolean()) { + return 0L; + } else if (randomBoolean()) { + return startingSeqNo; + } else { + return randomLongBetween(0, startingSeqNo); + } + }; + + LongSupplier toSeqNo = () -> { + final long maxSeqNo = engine.getSeqNoStats(-1).getMaxSeqNo(); + if (randomBoolean()) { + return maxSeqNo; + } else if (randomBoolean()) { + return Long.MAX_VALUE; + } else { + return randomLongBetween(maxSeqNo, Long.MAX_VALUE); + } + }; + // Can't access stats if didn't request it + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo.getAsLong(), + toSeqNo.getAsLong(), + false, + randomBoolean(), + false + ) + ) { + IllegalStateException error = expectThrows(IllegalStateException.class, snapshot::totalOperations); + assertThat(error.getMessage(), equalTo("Access stats of a snapshot created with [access_stats] is false")); + final List translogOps = drainAll(snapshot); + assertThat(translogOps, hasSize(numOps)); + error = expectThrows(IllegalStateException.class, snapshot::totalOperations); + assertThat(error.getMessage(), equalTo("Access stats of a snapshot created with [access_stats] is false")); + } + // Access stats and operations + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo.getAsLong(), + toSeqNo.getAsLong(), + false, + randomBoolean(), + true + ) + ) { + assertThat(snapshot.totalOperations(), equalTo(numOps)); + final List translogOps = drainAll(snapshot); + assertThat(translogOps, hasSize(numOps)); + assertThat(snapshot.totalOperations(), equalTo(numOps)); + } + // Verify count + assertThat(engine.countChanges("test", fromSeqNo.getAsLong(), toSeqNo.getAsLong()), equalTo(numOps)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java index df6d9380fd141..7bd2c2aa86327 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java @@ -406,10 +406,23 @@ public void testRecoverySourceWithSyntheticSource() throws IOException { topMapping(b -> b.startObject(SourceFieldMapper.NAME).field("mode", "synthetic").endObject()) ); DocumentMapper docMapper = mapperService.documentMapper(); - ParsedDocument doc = docMapper.parse(source(b -> { b.field("field1", "value1"); })); + ParsedDocument doc = docMapper.parse(source(b -> b.field("field1", "value1"))); assertNotNull(doc.rootDoc().getField("_recovery_source")); assertThat(doc.rootDoc().getField("_recovery_source").binaryValue(), equalTo(new BytesRef("{\"field1\":\"value1\"}"))); } + { + Settings settings = Settings.builder() + .put(IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true) + .build(); + MapperService mapperService = createMapperService( + settings, + topMapping(b -> b.startObject(SourceFieldMapper.NAME).field("mode", "synthetic").endObject()) + ); + DocumentMapper docMapper = mapperService.documentMapper(); + ParsedDocument doc = docMapper.parse(source(b -> b.field("field1", "value1"))); + assertNotNull(doc.rootDoc().getField("_recovery_source_size")); + assertThat(doc.rootDoc().getField("_recovery_source_size").numericValue(), equalTo(19L)); + } { Settings settings = Settings.builder().put(INDICES_RECOVERY_SOURCE_ENABLED_SETTING.getKey(), false).build(); MapperService mapperService = createMapperService( @@ -431,6 +444,17 @@ public void testRecoverySourceWithLogs() throws IOException { assertNotNull(doc.rootDoc().getField("_recovery_source")); assertThat(doc.rootDoc().getField("_recovery_source").binaryValue(), equalTo(new BytesRef("{\"@timestamp\":\"2012-02-13\"}"))); } + { + Settings settings = Settings.builder() + .put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.getName()) + .put(IndexSettings.INDICES_RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true) + .build(); + MapperService mapperService = createMapperService(settings, mapping(b -> {})); + DocumentMapper docMapper = mapperService.documentMapper(); + ParsedDocument doc = docMapper.parse(source(b -> { b.field("@timestamp", "2012-02-13"); })); + assertNotNull(doc.rootDoc().getField("_recovery_source_size")); + assertThat(doc.rootDoc().getField("_recovery_source_size").numericValue(), equalTo(27L)); + } { Settings settings = Settings.builder() .put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.getName()) diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 9e7f5fbbce1a3..ca616dc619ec9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -158,7 +158,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { System::nanoTime, null, true, - null + EngineTestCase.createMapperService() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 87c566d543d0f..a24988dee086d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -57,10 +57,8 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; @@ -142,6 +140,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -160,6 +159,8 @@ public abstract class EngineTestCase extends ESTestCase { protected Store store; protected Store storeReplica; + protected MapperService mapperService; + protected InternalEngine engine; protected InternalEngine replicaEngine; @@ -198,6 +199,27 @@ protected Settings indexSettings() { .build(); } + protected String defaultMapping() { + return """ + { + "dynamic": false, + "properties": { + "value": { + "type": "keyword" + }, + "nested_field": { + "type": "nested", + "properties": { + "field-0": { + "type": "keyword" + } + } + } + } + } + """; + } + @Override @Before public void setUp() throws Exception { @@ -212,15 +234,16 @@ public void setUp() throws Exception { } else { codecName = "default"; } - defaultSettings = IndexSettingsModule.newIndexSettings("test", indexSettings()); + defaultSettings = IndexSettingsModule.newIndexSettings("index", indexSettings()); threadPool = new TestThreadPool(getClass().getName()); store = createStore(); storeReplica = createStore(); Lucene.cleanLuceneIndex(store.directory()); Lucene.cleanLuceneIndex(storeReplica.directory()); primaryTranslogDir = createTempDir("translog-primary"); - translogHandler = createTranslogHandler(defaultSettings); - engine = createEngine(store, primaryTranslogDir); + mapperService = createMapperService(defaultSettings.getSettings(), defaultMapping()); + translogHandler = createTranslogHandler(mapperService); + engine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy()); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); @@ -230,7 +253,7 @@ public void setUp() throws Exception { engine.config().setEnableGcDeletes(false); } replicaTranslogDir = createTempDir("translog-replica"); - replicaEngine = createEngine(storeReplica, replicaTranslogDir); + replicaEngine = createEngine(defaultSettings, storeReplica, replicaTranslogDir, newMergePolicy()); currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); @@ -433,37 +456,9 @@ protected static ParsedDocument testParsedDocument( ); } - public static CheckedBiFunction nestedParsedDocFactory() throws Exception { - final MapperService mapperService = createMapperService(); - final String nestedMapping = Strings.toString( - XContentFactory.jsonBuilder() - .startObject() - .startObject("type") - .startObject("properties") - .startObject("nested_field") - .field("type", "nested") - .endObject() - .endObject() - .endObject() - .endObject() - ); - final DocumentMapper nestedMapper = mapperService.merge( - "type", - new CompressedXContent(nestedMapping), - MapperService.MergeReason.MAPPING_UPDATE - ); - return (docId, nestedFieldValues) -> { - final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); - if (nestedFieldValues > 0) { - XContentBuilder nestedField = source.startObject("nested_field"); - for (int i = 0; i < nestedFieldValues; i++) { - nestedField.field("field-" + i, "value-" + i); - } - source.endObject(); - } - source.endObject(); - return nestedMapper.parse(new SourceToParse(docId, BytesReference.bytes(source), XContentType.JSON)); - }; + public static ParsedDocument parseDocument(MapperService mapperService, String id, String routing) { + SourceToParse sourceToParse = new SourceToParse(id, new BytesArray("{ \"value\" : \"test\" }"), XContentType.JSON, routing); + return mapperService.documentMapper().parse(sourceToParse); } protected Store createStore() throws IOException { @@ -500,8 +495,8 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup ); } - protected TranslogHandler createTranslogHandler(IndexSettings indexSettings) { - return new TranslogHandler(xContentRegistry(), indexSettings); + protected TranslogHandler createTranslogHandler(MapperService mapperService) { + return new TranslogHandler(mapperService); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -857,7 +852,7 @@ public EngineConfig config( this::relativeTimeInNanos, indexCommitListener, true, - null + mapperService ); } @@ -1031,6 +1026,22 @@ public static List generateSingleDocHistory( return ops; } + private CheckedBiFunction nestedParsedDocFactory(MapperService mapperService) { + final DocumentMapper nestedMapper = mapperService.documentMapper(); + return (docId, nestedFieldValues) -> { + final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("value", "test"); + if (nestedFieldValues > 0) { + XContentBuilder nestedField = source.startObject("nested_field"); + for (int i = 0; i < nestedFieldValues; i++) { + nestedField.field("field-" + i, "value-" + i); + } + source.endObject(); + } + source.endObject(); + return nestedMapper.parse(new SourceToParse(docId, BytesReference.bytes(source), XContentType.JSON)); + }; + } + public List generateHistoryOnReplica( int numOps, boolean allowGapInSeqNo, @@ -1050,7 +1061,9 @@ public List generateHistoryOnReplica( long seqNo = startingSeqNo; final int maxIdValue = randomInt(numOps * 2); final List operations = new ArrayList<>(numOps); - CheckedBiFunction nestedParsedDocFactory = nestedParsedDocFactory(); + CheckedBiFunction nestedParsedDocFactory = nestedParsedDocFactory( + engine.engineConfig.getMapperService() + ); for (int i = 0; i < numOps; i++) { final String id = Integer.toString(randomInt(maxIdValue)); final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values()); @@ -1059,7 +1072,9 @@ public List generateHistoryOnReplica( final long startTime = threadPool.relativeTimeInNanos(); final int copies = allowDuplicate && rarely() ? between(2, 4) : 1; for (int copy = 0; copy < copies; copy++) { - final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null); + final ParsedDocument doc = isNestedDoc + ? nestedParsedDocFactory.apply(id, nestedValues) + : parseDocument(engine.engineConfig.getMapperService(), id, null); switch (opType) { case INDEX -> operations.add( new Engine.Index( @@ -1345,7 +1360,16 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); assertThat(luceneOp.opType(), equalTo(translogOp.opType())); if (luceneOp.opType() == Translog.Operation.Type.INDEX) { - assertThat(((Translog.Index) luceneOp).source(), equalTo(((Translog.Index) translogOp).source())); + if (engine.engineConfig.getMapperService().mappingLookup().isSourceSynthetic() + && engine.engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) { + assertToXContentEquivalent( + ((Translog.Index) luceneOp).source(), + ((Translog.Index) translogOp).source(), + XContentFactory.xContentType(((Translog.Index) luceneOp).source().array()) + ); + } else { + assertThat(((Translog.Index) luceneOp).source(), equalTo(((Translog.Index) translogOp).source())); + } } } } @@ -1401,15 +1425,19 @@ public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings } public static MapperService createMapperService() throws IOException { - IndexMetadata indexMetadata = IndexMetadata.builder("test") - .settings(indexSettings(1, 1).put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) - .putMapping("{\"properties\": {}}") + return createMapperService(Settings.EMPTY, "{}"); + } + + public static MapperService createMapperService(Settings settings, String mappings) throws IOException { + IndexMetadata indexMetadata = IndexMetadata.builder("index") + .settings(indexSettings(1, 1).put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).put(settings)) + .putMapping(mappings) .build(); MapperService mapperService = MapperTestUtils.newMapperService( new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), createTempDir(), - Settings.EMPTY, - "test" + indexMetadata.getSettings(), + "index" ); mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_UPDATE); return mapperService; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 57cca12f99c41..33c745de25438 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -43,6 +43,10 @@ long appliedOperations() { return appliedOperations.get(); } + public TranslogHandler(MapperService mapperService) { + this.mapperService = mapperService; + } + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap()); MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 150eddf039cec..8cca49223ed58 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -15,6 +15,9 @@ import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; @@ -31,7 +34,10 @@ import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.TranslogHandler; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -44,6 +50,9 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.nio.file.Path; @@ -85,7 +94,7 @@ public void setUp() throws Exception { index = new Index("index", "uuid"); shardId = new ShardId(index, 0); primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); - indexMode = randomFrom(IndexMode.values()); + indexMode = IndexMode.TIME_SERIES;// randomFrom(IndexMode.values()); } @Override @@ -94,7 +103,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testFollowingEngineRejectsNonFollowingIndex() { + public void testFollowingEngineRejectsNonFollowingIndex() throws IOException { final Settings.Builder builder = indexSettings(IndexVersion.current(), 1, 0); if (randomBoolean()) { builder.put("index.xpack.ccr.following_index", false); @@ -212,7 +221,7 @@ private EngineConfig engineConfig( final IndexSettings indexSettings, final ThreadPool threadPool, final Store store - ) { + ) throws IOException { final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); final Path translogPath = createTempDir("translog"); final TranslogConfig translogConfig = new TranslogConfig( @@ -221,6 +230,7 @@ private EngineConfig engineConfig( indexSettings, BigArrays.NON_RECYCLING_INSTANCE ); + final MapperService mapperService = EngineTestCase.createMapperService(); return new EngineConfig( shardIdValue, threadPool, @@ -253,7 +263,7 @@ public void onFailedEngine(String reason, Exception e) { System::nanoTime, null, true, - null + mapperService ); } @@ -689,6 +699,39 @@ public void close() throws IOException { }; } + private CheckedBiFunction nestedParsedDocFactory() throws Exception { + final MapperService mapperService = EngineTestCase.createMapperService(); + final String nestedMapping = Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("properties") + .startObject("nested_field") + .field("type", "nested") + .endObject() + .endObject() + .endObject() + .endObject() + ); + final DocumentMapper nestedMapper = mapperService.merge( + "type", + new CompressedXContent(nestedMapping), + MapperService.MergeReason.MAPPING_UPDATE + ); + return (docId, nestedFieldValues) -> { + final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); + if (nestedFieldValues > 0) { + XContentBuilder nestedField = source.startObject("nested_field"); + for (int i = 0; i < nestedFieldValues; i++) { + nestedField.field("field-" + i, "value-" + i); + } + source.endObject(); + } + source.endObject(); + return nestedMapper.parse(new SourceToParse(docId, BytesReference.bytes(source), XContentType.JSON)); + }; + } + public void testProcessOnceOnPrimary() throws Exception { final Settings.Builder settingsBuilder = indexSettings(IndexVersion.current(), 1, 0).put("index.xpack.ccr.following_index", true); switch (indexMode) { @@ -709,7 +752,7 @@ public void testProcessOnceOnPrimary() throws Exception { final Settings settings = settingsBuilder.build(); final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); - final CheckedBiFunction nestedDocFunc = EngineTestCase.nestedParsedDocFactory(); + final CheckedBiFunction nestedDocFunc = nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceIndexSettingsProviderTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceIndexSettingsProviderTests.java index 2ab77b38b3373..e3dbc2cdedcd2 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceIndexSettingsProviderTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceIndexSettingsProviderTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.license.MockLicenseState; @@ -269,6 +270,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSource() throws .build(); Settings result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, null, @@ -281,6 +283,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSource() throws syntheticSourceLicenseService.setSyntheticSourceFallback(true); result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, null, @@ -293,6 +296,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSource() throws assertEquals(SourceFieldMapper.Mode.STORED, SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.get(result)); result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, IndexMode.TIME_SERIES, @@ -305,6 +309,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSource() throws assertEquals(SourceFieldMapper.Mode.STORED, SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.get(result)); result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, IndexMode.LOGSDB, @@ -338,6 +343,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSourceFileMatch( ); Metadata metadata = mb.build(); Settings result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, null, @@ -361,6 +367,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSourceFileMatch( metadata = mb.build(); result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, null, @@ -373,6 +380,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSourceFileMatch( assertEquals(SourceFieldMapper.Mode.STORED, SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.get(result)); result = provider.getAdditionalIndexSettings( + IndexVersion.current(), DataStream.getDefaultBackingIndexName(dataStreamName, 2), dataStreamName, null,