From f9e98f7259fd891b497499d93a1c639b405610bc Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Wed, 26 Jun 2024 13:28:34 +0100 Subject: [PATCH] first try at using synthetic source in Lucene based recovery --- .../LuceneChangesSnapshotBenchmark.java | 253 ++++++++++++++++++ .../elasticsearch/index/engine/Engine.java | 3 +- .../index/engine/InternalEngine.java | 2 + .../index/engine/LuceneChangesSnapshot.java | 145 +++++----- .../index/engine/ReadOnlyEngine.java | 3 +- .../index/mapper/SourceFieldMapper.java | 8 +- .../vectors/DenseVectorFieldMapper.java | 1 + .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/engine/InternalEngineTests.java | 1 + .../engine/LuceneChangesSnapshotTests.java | 93 +------ .../index/engine/EngineTestCase.java | 2 +- 11 files changed, 353 insertions(+), 160 deletions(-) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/engine/LuceneChangesSnapshotBenchmark.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/engine/LuceneChangesSnapshotBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/engine/LuceneChangesSnapshotBenchmark.java new file mode 100644 index 0000000000000..cacd16f877af5 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/engine/LuceneChangesSnapshotBenchmark.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.benchmark.engine; + +import org.apache.lucene.document.StoredValue; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.similarities.BM25Similarity; +import org.apache.lucene.store.FSDirectory; +import org.elasticsearch.ElasticsearchGenerationException; +import org.elasticsearch.benchmark.index.mapper.MapperServiceFactory; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.LuceneChangesSnapshot; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +@Fork(value = 1) +@Warmup(iterations = 3, time = 3) +@Measurement(iterations = 5, time = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class LuceneChangesSnapshotBenchmark { + @Param({ "10000" }) + int numDocs; + + @Param({ "1000" }) + int numFields; + + @Param({ "10" }) + int numFieldsPerDoc; + + @Param({"true", "false"}) + boolean isSynthetic; + + @Param({ "12345" }) + long seed; + + static final int NUM_OPS = 1000; + + static Random rand = new Random(1234L); + + Path path; + MapperService mapperService; + FSDirectory dir; + IndexReader reader; + Engine.Searcher searcher; + LuceneChangesSnapshot snapshot; + + static int MAX_TERMS = 10000; + static String[] randomStrings = new String[MAX_TERMS]; + + static { + LogConfigurator.configureESLogging(); // native access requires logging to be initialized + for (int i = 0; i < MAX_TERMS; i++) { + randomStrings[i] = randomString(rand, 10, 50); + } + } + + @Setup + public void setup() throws IOException { + this.path = Files.createTempDirectory("snapshot_changes"); + + Map fields = randomFields(); + String mapping = createMapping(fields, isSynthetic); + this.mapperService = MapperServiceFactory.create(mapping); + System.out.println("synthetic=" + mapperService.mappingLookup().isSourceSynthetic()); + IndexWriterConfig config = new IndexWriterConfig(); + //config.setIndexSort(new Sort(new SortField[] { new SortedNumericSortField("random", SortField.Type.INT) })); + try ( + FSDirectory dir = FSDirectory.open(path); + IndexWriter writer = new IndexWriter(dir, config); + ) { + for (int i = 0; i < numDocs; i++) { + SourceToParse sourceToParse = randomSource(UUID.randomUUID().toString(), fields); + ParsedDocument doc = mapperService.documentMapper().parse(sourceToParse); + doc.updateSeqID(i, 0); + doc.version().setLongValue(0); + writer.addDocuments(doc.docs()); + } + } + this.dir = FSDirectory.open(path); + this.reader = DirectoryReader.open(dir); + long size = 0; + for (var file : dir.listAll()) { + size += dir.fileLength(file); + } + this.searcher = new Engine.Searcher("snapshot", reader, new BM25Similarity(), null, new QueryCachingPolicy() { + @Override + public void onUse(Query query) {} + + @Override + public boolean shouldCache(Query query) throws IOException { + return false; + } + }, () -> { + }); + this.snapshot = new LuceneChangesSnapshot(searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, 0, numDocs-1, true, true, true, IndexVersion.current()); + System.out.println("size=" + size); + } + + @TearDown + public void tearDown() { + try { + for (var file : dir.listAll()) { + dir.deleteFile(file); + } + Files.delete(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + IOUtils.closeWhileHandlingException(searcher, reader, dir); + } + + + @Benchmark + @OperationsPerInvocation(NUM_OPS) + public long recover() throws IOException { + long totalSize = 0; + try (var snapshot = new LuceneChangesSnapshot(isSynthetic ? mapperService.mappingLookup() : null, searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, 0, NUM_OPS-1, true, true, true, IndexVersion.current())) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + totalSize += op.estimateSize(); + } + } + return totalSize; + } + + final String[] types = new String[] {"keyword", "text", "long", "double"}; + private String createMapping(Map fields, boolean syntheticSource) throws IOException { + XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); + builder.startObject(); + if (syntheticSource) { + builder.startObject("_source"); + builder.field("mode", "synthetic"); + builder.endObject(); + } + builder.startObject("properties"); + builder.startObject("random").field("type", "integer").endObject(); + for (var entry : fields.entrySet()) { + builder.startObject(entry.getKey()); + builder.field("type", entry.getValue()); + if (syntheticSource && entry.getValue().equals("text")) { + builder.field("store", true); + } + builder.endObject(); + } + builder.endObject().endObject(); + return BytesReference.bytes(builder).utf8ToString(); + } + + private Map randomFields() { + Map fields = new HashMap<>(numFields); + for (int i = 0; i < numFields; i++) { + String type = types[rand.nextInt(4)]; + fields.put(randomString(rand, 5, 20), type); + } + return fields; + } + + private SourceToParse randomSource(String id, Map fields) { + Map source = new HashMap<>(numFieldsPerDoc); + source.put("random", rand.nextInt()); + var fieldList = new ArrayList<>(fields.keySet()); + for (int i = 0; i < numFieldsPerDoc; i++) { + int fieldIndex = rand.nextInt(0, fields.size()); + if (source.containsKey(fieldList.get(fieldIndex))) { + continue; + } + String type = fields.get(fieldList.get(fieldIndex)); + Object value = switch (type) { + case "text": + yield randomString(rand, 1024, 4096); + + case "keyword": + // cardinality=10000 + yield randomStrings[rand.nextInt(0, 10000)]; + + case "long": + yield rand.nextLong(); + + case "double": + yield rand.nextDouble(); + + default: + throw new AssertionError("Unkwnown type: " + type); + }; + source.put(fieldList.get(fieldIndex), value); + } + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.map(source); + builder.flush(); + return new SourceToParse(id, BytesReference.bytes(builder), XContentType.JSON); + } catch (IOException e) { + throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e); + } + } + + static String randomString(Random rand, int min, int max) { + var length = rand.nextInt(min, max); + var builder = new StringBuilder(length); + for (int i = 0; i < length; i++) { + builder.append((byte) (32 + rand.nextInt(94))); + } + return builder.toString(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1d62debd77e7f..b4c49f62a1d50 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -923,7 +923,7 @@ public boolean allowSearchIdleOptimization() { * @param source the source of the request * @param fromSeqNo the start sequence number (inclusive) * @param toSeqNo the end sequence number (inclusive) - * @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean) + * @see #newChangesSnapshot(MappingLookup, String, long, long, boolean, boolean, boolean) */ public abstract int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException; @@ -932,6 +932,7 @@ public boolean allowSearchIdleOptimization() { * This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}. */ public abstract Translog.Snapshot newChangesSnapshot( + MappingLookup mappingLookup, String source, long fromSeqNo, long toSeqNo, diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index be64365fedd34..5c872b9f3479d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -3109,6 +3109,7 @@ public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOEx @Override public Translog.Snapshot newChangesSnapshot( + MappingLookup mappingLookup, String source, long fromSeqNo, long toSeqNo, @@ -3121,6 +3122,7 @@ public Translog.Snapshot newChangesSnapshot( Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( + mappingLookup, searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index e63d5ef87973b..735520af386f2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -8,7 +8,6 @@ package org.elasticsearch.index.engine; -import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; @@ -24,29 +23,37 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollectorManager; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMetrics; +import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * A {@link Translog.Snapshot} from changes in a Lucene index */ -final class LuceneChangesSnapshot implements Translog.Snapshot { - static final int DEFAULT_BATCH_SIZE = 1024; +public final class LuceneChangesSnapshot implements Translog.Snapshot { + public static final int DEFAULT_BATCH_SIZE = 1024; private final int searchBatchSize; private final long fromSeqNo, toSeqNo; @@ -65,11 +72,24 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { private final IndexVersion indexVersionCreated; - private int storedFieldsReaderOrd = -1; - private StoredFieldsReader storedFieldsReader = null; - private final Thread creationThread; // for assertion + private final StoredFieldLoader storedFieldLoader; + private final SourceLoader sourceLoader; + + public LuceneChangesSnapshot( + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean singleConsumer, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + this(null, engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats, indexVersionCreated); + } + /** * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. * @@ -82,7 +102,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} * @param indexVersionCreated the version on which this index was created */ - LuceneChangesSnapshot( + public LuceneChangesSnapshot( + MappingLookup mappingLookup, Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, @@ -120,6 +141,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { final TopDocs topDocs = searchOperations(null, accessStats); this.totalHits = Math.toIntExact(topDocs.totalHits.value); this.scoreDocs = topDocs.scoreDocs; + if (mappingLookup != null && mappingLookup.isSourceSynthetic()) { + Set storedFields = mappingLookup.getMapping().syntheticFieldLoader().storedFieldLoaders().map(s -> s.getKey()).collect(Collectors.toSet()); + this.storedFieldLoader = StoredFieldLoader.create(false, storedFields); + this.sourceLoader = new SourceLoader.Synthetic(mappingLookup.getMapping()::syntheticFieldLoader, SourceFieldMetrics.NOOP); + } else { + this.storedFieldLoader = StoredFieldLoader.create(true, Set.of(SourceFieldMapper.RECOVERY_SOURCE_NAME)); + this.sourceLoader = null; + } fillParallelArray(scoreDocs, parallelArray); } @@ -224,26 +253,28 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray for (int i = 0; i < scoreDocs.length; i++) { scoreDocs[i].shardIndex = i; } - parallelArray.useSequentialStoredFieldsReader = singleConsumer && scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs); - if (parallelArray.useSequentialStoredFieldsReader == false) { - storedFieldsReaderOrd = -1; - storedFieldsReader = null; - } + // parallelArray.useSequentialStoredFieldsReader = singleConsumer && scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs); + // for better loading performance we sort the array by docID and // then visit all leaves in order. - if (parallelArray.useSequentialStoredFieldsReader == false) { - ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); - } + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); int docBase = -1; int maxDoc = 0; List leaves = indexSearcher.getIndexReader().leaves(); int readerIndex = 0; CombinedDocValues combinedDocValues = null; LeafReaderContext leaf = null; + LeafStoredFieldLoader leafStoredField = null; + SourceLoader.Leaf leafSourceLoader = null; + System.out.println(Arrays.toString(scoreDocs)); for (ScoreDoc scoreDoc : scoreDocs) { if (scoreDoc.doc >= docBase + maxDoc) { do { leaf = leaves.get(readerIndex++); + leafStoredField = storedFieldLoader.getLoader(leaf, null); + if (sourceLoader != null) { + leafSourceLoader = sourceLoader.leaf(leaf.reader(), null); + } docBase = leaf.docBase; maxDoc = leaf.reader().maxDoc(); } while (scoreDoc.doc >= docBase + maxDoc); @@ -257,23 +288,28 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID); parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID); parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID); + leafStoredField.advanceTo(segmentDocID); + final BytesReference source; + if (sourceLoader != null) { + assert parallelArray.hasRecoverySource[docIndex] == false; + source = leafSourceLoader.source(leafStoredField, segmentDocID).internalSourceRef(); + } else { + if (parallelArray.hasRecoverySource[docIndex]) { + List recovery = leafStoredField.storedFields().get(SourceFieldMapper.RECOVERY_SOURCE_NAME); + source = new BytesArray((BytesRef) recovery.get(0)); + } else { + source = leafStoredField.source(); + } + } + parallelArray.id[index] = leafStoredField.id(); + parallelArray.routing[index] = leafStoredField.routing(); + parallelArray.source[index] = source; } // now sort back based on the shardIndex. we use this to store the previous index - if (parallelArray.useSequentialStoredFieldsReader == false) { - ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); - } + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); } } - private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) { - for (int i = 0; i < scoreDocs.length - 1; i++) { - if (scoreDocs[i].doc + 1 != scoreDocs[i + 1].doc) { - return false; - } - } - return true; - } - private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); } @@ -316,45 +352,22 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { skippedOperations++; return null; } - final long version = parallelArray.version[docIndex]; - final String sourceField = parallelArray.hasRecoverySource[docIndex] - ? SourceFieldMapper.RECOVERY_SOURCE_NAME - : SourceFieldMapper.NAME; - final FieldsVisitor fields = new FieldsVisitor(true, sourceField); - - if (parallelArray.useSequentialStoredFieldsReader) { - if (storedFieldsReaderOrd != leaf.ord) { - if (leaf.reader() instanceof SequentialStoredFieldsLeafReader) { - storedFieldsReader = ((SequentialStoredFieldsLeafReader) leaf.reader()).getSequentialStoredFieldsReader(); - storedFieldsReaderOrd = leaf.ord; - } else { - storedFieldsReader = null; - storedFieldsReaderOrd = -1; - } - } - } - if (storedFieldsReader != null) { - assert singleConsumer : "Sequential access optimization must not be enabled for multiple consumers"; - assert parallelArray.useSequentialStoredFieldsReader; - assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord; - storedFieldsReader.document(segmentDocID, fields); - } else { - leaf.reader().document(segmentDocID, fields); - } - final Translog.Operation op; + final long version = parallelArray.version[docIndex]; final boolean isTombstone = parallelArray.isTombStone[docIndex]; - if (isTombstone && fields.id() == null) { - op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString()); + final String id = parallelArray.id[docIndex]; + final String routing = parallelArray.routing[docIndex]; + final BytesReference source = parallelArray.source[docIndex]; + final Translog.Operation op; + if (isTombstone && id == null) { + op = new Translog.NoOp(seqNo, primaryTerm, source.utf8ToString()); assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; } else { - final String id = fields.id(); if (isTombstone) { op = new Translog.Delete(id, seqNo, primaryTerm, version); assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; } else { - final BytesReference source = fields.source(); if (source == null) { // TODO: Callers should ask for the range that source should be retained. Thus we should always // check for the existence source once we make peer-recovery to send ops after the local checkpoint. @@ -369,7 +382,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { } // TODO: pass the latest timestamp from engine. final long autoGeneratedIdTimestamp = -1; - op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp); + op = new Translog.Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp); } } assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() @@ -401,7 +414,9 @@ private static final class ParallelArray { final long[] primaryTerm; final boolean[] isTombStone; final boolean[] hasRecoverySource; - boolean useSequentialStoredFieldsReader = false; + final String[] id; + final String[] routing; + final BytesReference[] source; ParallelArray(int size) { version = new long[size]; @@ -410,11 +425,9 @@ private static final class ParallelArray { isTombStone = new boolean[size]; hasRecoverySource = new boolean[size]; leafReaderContexts = new LeafReaderContext[size]; + id = new String[size]; + source = new BytesReference[size]; + routing = new String[size]; } } - - // for testing - boolean useSequentialStoredFieldsReader() { - return storedFieldsReader != null; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index eda408a9c8fde..a8d6f53e4889d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -355,13 +355,14 @@ public Closeable acquireHistoryRetentionLock() { @Override public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException { - try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true)) { + try (Translog.Snapshot snapshot = newChangesSnapshot(null, source, fromSeqNo, toSeqNo, false, true, true)) { return snapshot.totalOperations(); } } @Override public Translog.Snapshot newChangesSnapshot( + MappingLookup mappingLookup, String source, long fromSeqNo, long toSeqNo, diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 67e457907f8cc..a34601f59668d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -354,11 +354,11 @@ public void preParse(DocumentParserContext context) throws IOException { context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length)); } - if (originalSource != null && adaptedSource != originalSource) { + if ((indexMode == null || indexMode.isSyntheticSourceEnabled() == false) && originalSource != null && adaptedSource != originalSource) { // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery - BytesRef ref = originalSource.toBytesRef(); - context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); - context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); + //BytesRef ref = originalSource.toBytesRef(); + //context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); + //context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java index beb64973ff0fb..5cd6d38fae1f5 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java @@ -244,6 +244,7 @@ public Builder dimensions(int dimensions) { @Override public DenseVectorFieldMapper build(MapperBuilderContext context) { + validate(); return new DenseVectorFieldMapper( leafName(), new DenseVectorFieldType( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b3f19b1b7a81d..67944a7ca8ef1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2587,7 +2587,7 @@ public Translog.Snapshot newChangesSnapshot( boolean singleConsumer, boolean accessStats ) throws IOException { - return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats); + return getEngine().newChangesSnapshot(mapperService.mappingLookup(), source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats); } public List segments() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a89ac5bc5b74e..32cd1cca719c1 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -6483,6 +6483,7 @@ protected void doRun() throws Exception { if (randomBoolean()) { try ( Translog.Snapshot ignored = engine.newChangesSnapshot( + null, "test", min, max, diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 6f568ecf347c4..5e7214da09349 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -48,14 +48,14 @@ public void testBasics() throws Exception { long fromSeqNo = randomNonNegativeLong(); long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); // Empty engine - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot(null, "test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") ); } - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean(), randomBoolean())) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot(null, "test", fromSeqNo, toSeqNo, false, randomBoolean(), randomBoolean())) { assertThat(snapshot, SnapshotMatchers.size(0)); } int numOps = between(1, 100); @@ -190,6 +190,7 @@ public void testBasics() throws Exception { toSeqNo = randomLongBetween(fromSeqNo, numOps - 1); try ( Translog.Snapshot snapshot = engine.newChangesSnapshot( + null, "test", fromSeqNo, toSeqNo, @@ -294,89 +295,6 @@ public void testUpdateAndReadChangesConcurrently() throws Exception { } } - public void testAccessStoredFieldsSequentially() throws Exception { - try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { - int smallBatch = between(5, 9); - long seqNo = 0; - for (int i = 0; i < smallBatch; i++) { - engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true)); - seqNo++; - } - engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(1000), null), 1, 1000, true)); - seqNo = 11; - int largeBatch = between(15, 100); - for (int i = 0; i < largeBatch; i++) { - engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true)); - seqNo++; - } - // disable optimization for a small batch - Translog.Operation op; - try ( - LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot( - "test", - 0L, - between(1, smallBatch), - false, - randomBoolean(), - randomBoolean() - ) - ) { - while ((op = snapshot.next()) != null) { - assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader()); - } - assertFalse(snapshot.useSequentialStoredFieldsReader()); - } - // disable optimization for non-sequential accesses - try ( - LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot( - "test", - between(1, 3), - between(20, 100), - false, - randomBoolean(), - randomBoolean() - ) - ) { - while ((op = snapshot.next()) != null) { - assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader()); - } - assertFalse(snapshot.useSequentialStoredFieldsReader()); - } - // enable optimization for sequential access of 10+ docs - try ( - LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot( - "test", - 11, - between(21, 100), - false, - true, - randomBoolean() - ) - ) { - while ((op = snapshot.next()) != null) { - assertTrue(op.toString(), snapshot.useSequentialStoredFieldsReader()); - } - assertTrue(snapshot.useSequentialStoredFieldsReader()); - } - // disable optimization if snapshot is accessed by multiple consumers - try ( - LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot( - "test", - 11, - between(21, 100), - false, - false, - randomBoolean() - ) - ) { - while ((op = snapshot.next()) != null) { - assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader()); - } - assertFalse(snapshot.useSequentialStoredFieldsReader()); - } - } - } - class Follower extends Thread { private final InternalEngine leader; private final InternalEngine engine; @@ -404,6 +322,7 @@ void pullOperations(InternalEngine follower) throws IOException { long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); try ( Translog.Snapshot snapshot = leader.newChangesSnapshot( + null, "test", fromSeqNo, toSeqNo, @@ -457,7 +376,7 @@ private List drainAll(Translog.Snapshot snapshot) throws IOE public void testOverFlow() throws Exception { long fromSeqNo = randomLongBetween(0, 5); long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot(null, "test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), @@ -502,6 +421,7 @@ public void testStats() throws Exception { // Can't access stats if didn't request it try ( Translog.Snapshot snapshot = engine.newChangesSnapshot( + null, "test", fromSeqNo.getAsLong(), toSeqNo.getAsLong(), @@ -520,6 +440,7 @@ public void testStats() throws Exception { // Access stats and operations try ( Translog.Snapshot snapshot = engine.newChangesSnapshot( + null, "test", fromSeqNo.getAsLong(), toSeqNo.getAsLong(), diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 1c7cabb541581..8618e6f20f20f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1283,7 +1283,7 @@ public static List getDocIds(Engine engine, boolean refresh */ public static List readAllOperationsInLucene(Engine engine) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean())) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot(null, "test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean())) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op);