diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java new file mode 100644 index 0000000000000..211e9f223f2a8 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/LuceneChangesSnapshotBenchmark.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.benchmark.index; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.similarities.BM25Similarity; +import org.apache.lucene.store.FSDirectory; +import org.elasticsearch.benchmark.index.mapper.MapperServiceFactory; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.codec.PerFieldMapperCodec; +import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.LuceneChangesSnapshot; +import org.elasticsearch.index.engine.LuceneChangesSnapshotNew; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +@Fork(value = 1) +@Warmup(iterations = 3, time = 3) +@Measurement(iterations = 5, time = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class LuceneChangesSnapshotBenchmark { + @Param({ "synthetic", "stored" }) + String sourceMode; + + @Param({ "true", "false" }) + boolean sequential; + + static final int NUM_OPS = 10000; + + Path path; + MapperService mapperService; + FSDirectory dir; + IndexReader reader; + Engine.Searcher searcher; + + static { + LogConfigurator.configureESLogging(); // native access requires logging to be initialized + } + + @Setup + public void setup() throws IOException { + this.path = Files.createTempDirectory("snapshot_changes"); + + this.mapperService = MapperServiceFactory.create(readMappings(sourceMode)); + IndexWriterConfig config = new IndexWriterConfig(); + config.setCodec( + new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_COMPRESSION, mapperService, BigArrays.NON_RECYCLING_INSTANCE) + ); + if (sequential == false) { + config.setIndexSort(new Sort(new SortField[] { new SortField("_seq_no", SortField.Type.LONG, true) })); + } + try (FSDirectory dir = FSDirectory.open(path); IndexWriter writer = new IndexWriter(dir, config);) { + try ( + var inputStream = readSampleDocs(); + XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, inputStream) + ) { + int id = 0; + // find the hits array + while (parser.nextToken() != XContentParser.Token.START_ARRAY) { + parser.nextToken(); + } + + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + // skip start object + parser.nextToken(); + // skip _source field name + parser.nextToken(); + XContentBuilder source = XContentBuilder.builder(XContentType.JSON.xContent()); + source.copyCurrentStructure(parser); + var sourceBytes = BytesReference.bytes(source); + SourceToParse sourceToParse = new SourceToParse(Integer.toString(id), sourceBytes, XContentType.JSON); + ParsedDocument doc = mapperService.documentMapper().parse(sourceToParse); + doc.updateSeqID(id, 0); + doc.version().setLongValue(0); + writer.addDocuments(doc.docs()); + id++; + parser.nextToken(); + } + } + } + this.dir = FSDirectory.open(path); + this.reader = DirectoryReader.open(dir); + this.searcher = new Engine.Searcher("snapshot", reader, new BM25Similarity(), null, new QueryCachingPolicy() { + @Override + public void onUse(Query query) {} + + @Override + public boolean shouldCache(Query query) throws IOException { + return false; + } + }, () -> {}); + } + + @TearDown + public void tearDown() { + try { + for (var file : dir.listAll()) { + dir.deleteFile(file); + } + Files.delete(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + IOUtils.closeWhileHandlingException(searcher, reader, dir); + } + + @Benchmark + @OperationsPerInvocation(NUM_OPS) + public long recover() throws IOException { + long totalSize = 0; + Translog.Snapshot snapshot = mapperService.mappingLookup().isSourceSynthetic() + ? new LuceneChangesSnapshotNew( + mapperService.mappingLookup().newSourceLoader(mapperService.getMapperMetrics().sourceFieldMetrics()), + searcher, + 512, + 0, + NUM_OPS - 1, + true, + true, + true, + IndexVersion.current() + ) + : new LuceneChangesSnapshot( + searcher, + LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, + 0, + NUM_OPS - 1, + true, + true, + true, + IndexVersion.current() + ); + try (snapshot) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + totalSize += op.estimateSize(); + } + } + return totalSize; + } + + private String readMappings(String mode) throws IOException { + return Streams.readFully(LuceneChangesSnapshotBenchmark.class.getResourceAsStream("lucene-changes-mappings.json")) + .utf8ToString() + .replace("$1", mode); + } + + private InputStream readSampleDocs() throws IOException { + return new GZIPInputStream(LuceneChangesSnapshotBenchmark.class.getResourceAsStream("lucene-changes-sample-docs.json.gz")); + } +} diff --git a/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-mappings.json b/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-mappings.json new file mode 100644 index 0000000000000..97928dff318eb --- /dev/null +++ b/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-mappings.json @@ -0,0 +1,482 @@ +{ + "_meta": { + "managed_by": "fleet", + "managed": true, + "package": { + "name": "kafka" + } + }, + "_data_stream_timestamp": { + "enabled": true + }, + "_source": { + "mode": "$1" + }, + "date_detection": false, + "runtime": { + "rally.doc_size": { + "type": "long" + }, + "rally.message_size": { + "type": "long" + } + }, + "properties": { + "@timestamp": { + "type": "date", + "ignore_malformed": false + }, + "agent": { + "properties": { + "ephemeral_id": { + "type": "keyword", + "ignore_above": 1024 + }, + "hostname": { + "type": "keyword", + "ignore_above": 1024 + }, + "id": { + "type": "keyword", + "ignore_above": 1024 + }, + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + }, + "type": { + "type": "keyword", + "ignore_above": 1024 + }, + "version": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "cloud": { + "properties": { + "account": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "availability_zone": { + "type": "keyword", + "ignore_above": 1024 + }, + "image": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "instance": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 1024 + }, + "name": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "machine": { + "properties": { + "type": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "project": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "provider": { + "type": "keyword", + "ignore_above": 1024 + }, + "region": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "container": { + "dynamic": "true", + "properties": { + "id": { + "type": "keyword", + "ignore_above": 1024 + }, + "image": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "labels": { + "type": "object", + "dynamic": "true" + }, + "name": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "data_stream": { + "properties": { + "dataset": { + "type": "keyword" + }, + "namespace": { + "type": "keyword" + }, + "type": { + "type": "keyword" + } + } + }, + "ecs": { + "properties": { + "version": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "error": { + "properties": { + "log": { + "type": "keyword", + "ignore_above": 1024 + }, + "message": { + "type": "text" + } + } + }, + "event": { + "properties": { + "agent_id_status": { + "type": "keyword", + "ignore_above": 1024 + }, + "created": { + "type": "date", + "format": "strict_date_optional_time" + }, + "dataset": { + "type": "keyword" + }, + "end": { + "type": "date" + }, + "id": { + "type": "keyword", + "ignore_above": 1024 + }, + "ingested": { + "type": "date", + "format": "strict_date_optional_time" + }, + "kind": { + "type": "keyword", + "ignore_above": 1024 + }, + "module": { + "type": "keyword" + }, + "start": { + "type": "date" + }, + "timezone": { + "type": "keyword", + "ignore_above": 1024 + }, + "type": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "fileset": { + "properties": { + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + } + } + }, + "host": { + "properties": { + "architecture": { + "type": "keyword", + "ignore_above": 1024 + }, + "containerized": { + "type": "boolean" + }, + "domain": { + "type": "keyword", + "ignore_above": 1024 + }, + "hostname": { + "type": "keyword", + "ignore_above": 1024 + }, + "id": { + "type": "keyword", + "ignore_above": 1024 + }, + "ip": { + "type": "ip" + }, + "mac": { + "type": "keyword", + "ignore_above": 1024 + }, + "name": { + "type": "keyword", + "ignore_above": 1024 + }, + "os": { + "properties": { + "build": { + "type": "keyword", + "ignore_above": 1024 + }, + "codename": { + "type": "keyword", + "ignore_above": 1024 + }, + "family": { + "type": "keyword", + "ignore_above": 1024 + }, + "kernel": { + "type": "keyword", + "ignore_above": 1024 + }, + "name": { + "type": "keyword", + "ignore_above": 1024, + "fields": { + "text": { + "type": "text" + } + } + }, + "platform": { + "type": "keyword", + "ignore_above": 1024 + }, + "version": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "type": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "input": { + "properties": { + "type": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "kafka": { + "properties": { + "log": { + "properties": { + "class": { + "type": "keyword", + "ignore_above": 1024 + }, + "component": { + "type": "keyword", + "ignore_above": 1024 + }, + "thread": { + "type": "keyword", + "ignore_above": 1024 + }, + "trace": { + "properties": { + "class": { + "type": "keyword", + "ignore_above": 1024 + }, + "message": { + "type": "text" + } + } + } + } + } + } + }, + "kubernetes": { + "properties": { + "container": { + "properties": { + "image": { + "type": "keyword", + "ignore_above": 1024 + }, + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + } + } + }, + "labels": { + "properties": { + "app": { + "type": "keyword", + "ignore_above": 1024 + }, + "controller-revision-hash": { + "type": "keyword", + "ignore_above": 1024 + }, + "release": { + "type": "keyword", + "ignore_above": 1024 + }, + "statefulset_kubernetes_io/pod-name": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "namespace": { + "type": "keyword", + "ignore_above": 1024 + }, + "node": { + "properties": { + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + } + } + }, + "pod": { + "properties": { + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + }, + "uid": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "statefulset": { + "properties": { + "name": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + } + } + } + } + }, + "log": { + "properties": { + "file": { + "properties": { + "path": { + "type": "keyword", + "fields": { + "text": { + "type": "text" + } + } + } + } + }, + "flags": { + "type": "keyword", + "ignore_above": 1024 + }, + "level": { + "type": "keyword", + "ignore_above": 1024 + }, + "offset": { + "type": "long" + } + } + }, + "message": { + "type": "text" + }, + "rally": { + "type": "object" + }, + "service": { + "properties": { + "type": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "stream": { + "type": "keyword", + "ignore_above": 1024 + }, + "tags": { + "type": "keyword", + "ignore_above": 1024 + } + } +} \ No newline at end of file diff --git a/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-sample-docs.json.gz b/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-sample-docs.json.gz new file mode 100644 index 0000000000000..ca8f3539fcbd7 Binary files /dev/null and b/benchmarks/src/main/resources/org/elasticsearch/benchmark/index/lucene-changes-sample-docs.json.gz differ diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java new file mode 100644 index 0000000000000..f36195082b1ef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneBatchChangesSnapshot.java @@ -0,0 +1,361 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollectorManager; +import org.apache.lucene.util.ArrayUtil; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMetrics; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.translog.Translog; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * A {@link Translog.Snapshot} from changes in a Lucene index + */ +public final class LuceneBatchChangesSnapshot implements Translog.Snapshot { + static final int DEFAULT_BATCH_SIZE = 1024; + + private final int searchBatchSize; + private final long fromSeqNo, toSeqNo; + private long lastSeenSeqNo; + private int skippedOperations; + private final boolean requiredFullRange; + + private final IndexSearcher indexSearcher; + private int docIndex = 0; + private final boolean accessStats; + private final int totalHits; + private ScoreDoc[] scoreDocs; + private final Translog.Operation[] ops; + private final Closeable onClose; + + private final IndexVersion indexVersionCreated; + + private final StoredFieldLoader storedFieldLoader; + private final SourceLoader sourceLoader; + + /** + * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. + * + * @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully + * @param searchBatchSize the number of documents should be returned by each search + * @param fromSeqNo the min requesting seq# - inclusive + * @param toSeqNo the maximum requesting seq# - inclusive + * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo + * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} + * @param indexVersionCreated the version on which this index was created + */ + public LuceneBatchChangesSnapshot( + MappingLookup mappingLookup, + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + if (searchBatchSize <= 0) { + throw new IllegalArgumentException("Search_batch_size must be positive [" + searchBatchSize + "]"); + } + final AtomicBoolean closed = new AtomicBoolean(); + this.onClose = () -> { + if (closed.compareAndSet(false, true)) { + IOUtils.close(engineSearcher); + } + }; + final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); + this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize; + this.fromSeqNo = fromSeqNo; + this.toSeqNo = toSeqNo; + this.lastSeenSeqNo = fromSeqNo - 1; + this.requiredFullRange = requiredFullRange; + this.indexSearcher = newIndexSearcher(engineSearcher); + this.indexSearcher.setQueryCache(null); + this.accessStats = accessStats; + this.ops = new Translog.Operation[this.searchBatchSize]; + this.indexVersionCreated = indexVersionCreated; + final TopDocs topDocs = searchOperations(null, accessStats); + this.totalHits = Math.toIntExact(topDocs.totalHits.value); + this.scoreDocs = topDocs.scoreDocs; + if (mappingLookup != null && mappingLookup.isSourceSynthetic()) { + Set storedFields = mappingLookup.getMapping() + .syntheticFieldLoader() + .storedFieldLoaders() + .map(s -> s.getKey()) + .collect(Collectors.toSet()); + this.storedFieldLoader = StoredFieldLoader.create(false, storedFields); + this.sourceLoader = new SourceLoader.Synthetic(mappingLookup.getMapping()::syntheticFieldLoader, SourceFieldMetrics.NOOP); + } else { + this.storedFieldLoader = StoredFieldLoader.create(true, Set.of()); + this.sourceLoader = null; + } + fillOps(scoreDocs, ops); + } + + @Override + public void close() throws IOException { + onClose.close(); + } + + @Override + public int totalOperations() { + if (accessStats == false) { + throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false"); + } + return totalHits; + } + + @Override + public int skippedOperations() { + return skippedOperations; + } + + @Override + public Translog.Operation next() throws IOException { + Translog.Operation op = null; + for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { + op = ops[idx]; + if (op != null) { + break; + } + } + if (requiredFullRange) { + rangeCheck(op); + } + if (op != null) { + lastSeenSeqNo = op.seqNo(); + } + return op; + } + + private void rangeCheck(Translog.Operation op) { + if (op == null) { + if (lastSeenSeqNo < toSeqNo) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; prematurely terminated last_seen_seqno [" + + lastSeenSeqNo + + "]" + ); + } + } else { + final long expectedSeqNo = lastSeenSeqNo + 1; + if (op.seqNo() != expectedSeqNo) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; expected seqno [" + + expectedSeqNo + + "]; found [" + + op + + "]" + ); + } + } + } + + private int nextDocIndex() throws IOException { + // we have processed all docs in the current search - fetch the next batch + if (docIndex == scoreDocs.length && docIndex > 0) { + final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; + scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs; + fillOps(scoreDocs, ops); + docIndex = 0; + } + if (docIndex < scoreDocs.length) { + int idx = docIndex; + docIndex++; + return idx; + } + return -1; + } + + private void fillOps(ScoreDoc[] scoreDocs, Translog.Operation[] ops) throws IOException { + for (int i = 0; i < scoreDocs.length; i++) { + scoreDocs[i].shardIndex = i; + } + // for better loading performance we sort the array by docID and + // then visit all leaves in order. + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); + int docBase = -1; + int maxDoc = 0; + List leaves = indexSearcher.getIndexReader().leaves(); + int readerIndex = 0; + LeafReaderContext leaf = null; + CombinedDocValues combinedDocValues = null; + LeafStoredFieldLoader leafStoredFieldLoader = null; + SourceLoader.Leaf leafSourceLoader = null; + for (ScoreDoc scoreDoc : scoreDocs) { + if (scoreDoc.doc >= docBase + maxDoc) { + do { + leaf = leaves.get(readerIndex++); + docBase = leaf.docBase; + maxDoc = leaf.reader().maxDoc(); + leafStoredFieldLoader = storedFieldLoader.getLoader(leaf, null); + if (sourceLoader != null) { + leafSourceLoader = sourceLoader.leaf(leaf.reader(), null); + } else { + leafSourceLoader = null; + } + } while (scoreDoc.doc >= docBase + maxDoc); + combinedDocValues = new CombinedDocValues(leaf.reader()); + } + final int segmentDocID = scoreDoc.doc - docBase; + final int index = scoreDoc.shardIndex; + + leafStoredFieldLoader.advanceTo(segmentDocID); + final BytesReference source; + if (sourceLoader != null) { + source = leafSourceLoader.source(leafStoredFieldLoader, segmentDocID).internalSourceRef(); + } else { + source = leafStoredFieldLoader.source(); + } + + final long primaryTerm = combinedDocValues.docPrimaryTerm(segmentDocID); + assert primaryTerm > 0 : "nested child document must be excluded"; + final long seqNo = combinedDocValues.docSeqNo(segmentDocID); + // Only pick the first seen seq# + if (seqNo == lastSeenSeqNo) { + skippedOperations++; + ops[index] = null; + continue; + } + final long version = combinedDocValues.docVersion(segmentDocID); + final Translog.Operation op; + final boolean isTombstone = combinedDocValues.isTombstone(segmentDocID); + if (isTombstone && leafStoredFieldLoader.id() == null) { + op = new Translog.NoOp(seqNo, primaryTerm, source.utf8ToString()); + assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; + } else { + final String id = leafStoredFieldLoader.id(); + if (isTombstone) { + op = new Translog.Delete(id, seqNo, primaryTerm, version); + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; + } else { + if (source == null) { + // TODO: Callers should ask for the range that source should be retained. Thus we should always + // check for the existence source once we make peer-recovery to send ops after the local checkpoint. + if (requiredFullRange) { + throw new MissingHistoryOperationsException( + "source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo + ); + } else { + skippedOperations++; + ops[index] = null; + continue; + } + } + // TODO: pass the latest timestamp from engine. + final long autoGeneratedIdTimestamp = -1; + op = new Translog.Index( + id, + seqNo, + primaryTerm, + version, + source, + leafStoredFieldLoader.routing(), + autoGeneratedIdTimestamp + ); + } + } + assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() + : "Unexpected operation; " + + "last_seen_seqno [" + + lastSeenSeqNo + + "], from_seqno [" + + fromSeqNo + + "], to_seqno [" + + toSeqNo + + "], op [" + + op + + "]"; + ops[index] = op; + } + } + + private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { + return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + } + + private static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) { + return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); + } + + static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) + throws IOException { + if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)); + } + + private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException { + final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated); + assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only"; + final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG); + TopFieldCollectorManager topFieldCollectorManager = new TopFieldCollectorManager( + new Sort(sortBySeqNo), + searchBatchSize, + after, + accurateTotalHits ? Integer.MAX_VALUE : 0 + ); + return indexSearcher.search(rangeQuery, topFieldCollectorManager); + } + + private static boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { + final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); + if (ndv == null || ndv.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETES_FIELD + "] is not found"); + } + return ndv.longValue() == 1; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 05cc6d148be5e..2e008e0ff92bf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -46,8 +46,8 @@ /** * A {@link Translog.Snapshot} from changes in a Lucene index */ -final class LuceneChangesSnapshot implements Translog.Snapshot { - static final int DEFAULT_BATCH_SIZE = 1024; +public final class LuceneChangesSnapshot implements Translog.Snapshot { + public static final int DEFAULT_BATCH_SIZE = 1024; private final int searchBatchSize; private final long fromSeqNo, toSeqNo; @@ -83,7 +83,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} * @param indexVersionCreated the version on which this index was created */ - LuceneChangesSnapshot( + public LuceneChangesSnapshot( Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshotNew.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshotNew.java new file mode 100644 index 0000000000000..2cd1871a9aef1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshotNew.java @@ -0,0 +1,382 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollectorManager; +import org.apache.lucene.util.ArrayUtil; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.Transports; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A {@link Translog.Snapshot} from changes in a Lucene index + */ +public final class LuceneChangesSnapshotNew implements Translog.Snapshot { + public static final int DEFAULT_BATCH_SIZE = 1024; + + private final int searchBatchSize; + private final long fromSeqNo, toSeqNo; + private long lastSeenSeqNo; + private int skippedOperations; + private final boolean requiredFullRange; + private final boolean singleConsumer; + + private final IndexSearcher indexSearcher; + private int docIndex = 0; + private final boolean accessStats; + private final int totalHits; + private ScoreDoc[] scoreDocs; + private final ParallelArray parallelArray; + private final Closeable onClose; + + private final IndexVersion indexVersionCreated; + + private final Thread creationThread; // for assertion + + private final SourceLoader sourceLoader; + private final StoredFieldLoader storedFieldLoader; + + /** + * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. + * + * @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully + * @param searchBatchSize the number of documents should be returned by each search + * @param fromSeqNo the min requesting seq# - inclusive + * @param toSeqNo the maximum requesting seq# - inclusive + * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo + * @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot + * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} + * @param indexVersionCreated the version on which this index was created + */ + public LuceneChangesSnapshotNew( + SourceLoader sourceLoader, + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean singleConsumer, + boolean accessStats, + IndexVersion indexVersionCreated + ) throws IOException { + if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + if (searchBatchSize <= 0) { + throw new IllegalArgumentException("Search_batch_size must be positive [" + searchBatchSize + "]"); + } + final AtomicBoolean closed = new AtomicBoolean(); + this.onClose = () -> { + if (closed.compareAndSet(false, true)) { + IOUtils.close(engineSearcher); + } + }; + final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); + this.creationThread = Thread.currentThread(); + this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize; + this.fromSeqNo = fromSeqNo; + this.toSeqNo = toSeqNo; + this.lastSeenSeqNo = fromSeqNo - 1; + this.requiredFullRange = requiredFullRange; + this.singleConsumer = singleConsumer; + this.indexSearcher = newIndexSearcher(engineSearcher); + this.indexSearcher.setQueryCache(null); + this.accessStats = accessStats; + this.parallelArray = new ParallelArray(this.searchBatchSize); + this.indexVersionCreated = indexVersionCreated; + final TopDocs topDocs = searchOperations(null, accessStats); + this.totalHits = Math.toIntExact(topDocs.totalHits.value); + this.scoreDocs = topDocs.scoreDocs; + this.sourceLoader = sourceLoader; + this.storedFieldLoader = StoredFieldLoader.create(false, sourceLoader.requiredStoredFields()); + fillParallelArray(scoreDocs, parallelArray); + } + + @Override + public void close() throws IOException { + assert assertAccessingThread(); + onClose.close(); + } + + @Override + public int totalOperations() { + assert assertAccessingThread(); + if (accessStats == false) { + throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false"); + } + return totalHits; + } + + @Override + public int skippedOperations() { + assert assertAccessingThread(); + return skippedOperations; + } + + @Override + public Translog.Operation next() throws IOException { + assert assertAccessingThread(); + Translog.Operation op = null; + for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { + op = readDocAsOp(idx); + if (op != null) { + break; + } + } + if (requiredFullRange) { + rangeCheck(op); + } + if (op != null) { + lastSeenSeqNo = op.seqNo(); + } + return op; + } + + private boolean assertAccessingThread() { + assert singleConsumer == false || creationThread == Thread.currentThread() + : "created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]"; + assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO"); + return true; + } + + private void rangeCheck(Translog.Operation op) { + if (op == null) { + if (lastSeenSeqNo < toSeqNo) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; prematurely terminated last_seen_seqno [" + + lastSeenSeqNo + + "]" + ); + } + } else { + final long expectedSeqNo = lastSeenSeqNo + 1; + if (op.seqNo() != expectedSeqNo) { + throw new MissingHistoryOperationsException( + "Not all operations between from_seqno [" + + fromSeqNo + + "] " + + "and to_seqno [" + + toSeqNo + + "] found; expected seqno [" + + expectedSeqNo + + "]; found [" + + op + + "]" + ); + } + } + } + + private int nextDocIndex() throws IOException { + // we have processed all docs in the current search - fetch the next batch + if (docIndex == scoreDocs.length && docIndex > 0) { + final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; + scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs; + fillParallelArray(scoreDocs, parallelArray); + docIndex = 0; + } + if (docIndex < scoreDocs.length) { + int idx = docIndex; + docIndex++; + return idx; + } + return -1; + } + + private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException { + if (scoreDocs.length > 0) { + for (int i = 0; i < scoreDocs.length; i++) { + scoreDocs[i].shardIndex = i; + } + // for better loading performance we sort the array by docID and + // then visit all leaves in order. + if (parallelArray.useSequentialStoredFieldsReader == false) { + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); + } + int docBase = -1; + int maxDoc = 0; + List leaves = indexSearcher.getIndexReader().leaves(); + int readerIndex = 0; + CombinedDocValues combinedDocValues = null; + LeafReaderContext leaf = null; + for (ScoreDoc scoreDoc : scoreDocs) { + if (scoreDoc.doc >= docBase + maxDoc) { + do { + leaf = leaves.get(readerIndex++); + docBase = leaf.docBase; + maxDoc = leaf.reader().maxDoc(); + } while (scoreDoc.doc >= docBase + maxDoc); + combinedDocValues = new CombinedDocValues(leaf.reader()); + } + final int segmentDocID = scoreDoc.doc - docBase; + final int index = scoreDoc.shardIndex; + parallelArray.leafReaderContexts[index] = leaf; + parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID); + parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID); + parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID); + parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID); + parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID); + } + // now sort back based on the shardIndex. we use this to store the previous index + if (parallelArray.useSequentialStoredFieldsReader == false) { + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); + } + } + } + + private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException { + return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + } + + private static Query rangeQuery(long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) { + return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); + } + + static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, IndexVersion indexVersionCreated) + throws IOException { + if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)); + } + + private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException { + final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated); + assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only"; + final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG); + TopFieldCollectorManager topFieldCollectorManager = new TopFieldCollectorManager( + new Sort(sortBySeqNo), + searchBatchSize, + after, + accurateTotalHits ? Integer.MAX_VALUE : 0, + false + ); + return indexSearcher.search(rangeQuery, topFieldCollectorManager); + } + + private Translog.Operation readDocAsOp(int docIndex) throws IOException { + final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; + final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; + final long primaryTerm = parallelArray.primaryTerm[docIndex]; + assert primaryTerm > 0 : "nested child document must be excluded"; + final long seqNo = parallelArray.seqNo[docIndex]; + // Only pick the first seen seq# + if (seqNo == lastSeenSeqNo) { + skippedOperations++; + return null; + } + final long version = parallelArray.version[docIndex]; + var sourceLeaf = sourceLoader.leaf(leaf.reader(), null); + var fields = storedFieldLoader.getLoader(leaf, null); + fields.advanceTo(segmentDocID); + final Translog.Operation op; + final boolean isTombstone = parallelArray.isTombStone[docIndex]; + if (isTombstone && fields.id() == null) { + op = new Translog.NoOp(seqNo, primaryTerm, "tombstone"); + assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; + } else { + final String id = fields.id(); + if (isTombstone) { + op = new Translog.Delete(id, seqNo, primaryTerm, version); + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; + } else { + final BytesReference source = sourceLeaf.source(fields, segmentDocID).internalSourceRef(); + if (source == null) { + // TODO: Callers should ask for the range that source should be retained. Thus we should always + // check for the existence source once we make peer-recovery to send ops after the local checkpoint. + if (requiredFullRange) { + throw new MissingHistoryOperationsException( + "source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo + ); + } else { + skippedOperations++; + return null; + } + } + // TODO: pass the latest timestamp from engine. + final long autoGeneratedIdTimestamp = -1; + op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp); + } + } + assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() + : "Unexpected operation; " + + "last_seen_seqno [" + + lastSeenSeqNo + + "], from_seqno [" + + fromSeqNo + + "], to_seqno [" + + toSeqNo + + "], op [" + + op + + "]"; + return op; + } + + private static boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { + final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD); + if (ndv == null || ndv.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETES_FIELD + "] is not found"); + } + return ndv.longValue() == 1; + } + + private static final class ParallelArray { + final LeafReaderContext[] leafReaderContexts; + final long[] version; + final long[] seqNo; + final long[] primaryTerm; + final boolean[] isTombStone; + final boolean[] hasRecoverySource; + boolean useSequentialStoredFieldsReader = false; + + ParallelArray(int size) { + version = new long[size]; + seqNo = new long[size]; + primaryTerm = new long[size]; + isTombStone = new boolean[size]; + hasRecoverySource = new boolean[size]; + leafReaderContexts = new LeafReaderContext[size]; + } + } +}