From d0263411cee3f258c4de7b5437c6b5149d24910b Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Wed, 24 Jan 2024 13:17:58 +0000 Subject: [PATCH] Get from translog fails with large dense_vector This change fixes the engine to apply the current codec when retrieving documents from the translog. We need to use the same codec than the main index in order to ensure that all the source data is indexable. The internal codec treats some fields differently than the default one, for instance dense_vectors are limited to 1024 dimensions. This PR ensures that these customizations are applied when indexing document for translog retrieval. Closes #104639 --- .../index/engine/InternalEngine.java | 2 +- .../index/engine/TranslogDirectoryReader.java | 16 +++-- .../index/shard/ShardGetServiceTests.java | 66 +++++++++++++++---- 3 files changed, 63 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 85b9417816e0f..92ff4c23b2dfa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -811,7 +811,7 @@ private GetResult getFromTranslog( index, mappingLookup, documentParser, - config().getAnalyzer(), + config(), translogInMemorySegmentsCount::incrementAndGet ); final Engine.Searcher searcher = new Engine.Searcher( diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java index a09810750c66e..6e1c01c886145 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java @@ -8,7 +8,6 @@ package org.elasticsearch.index.engine; -import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.BaseTermsEnum; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.ByteVectorValues; @@ -83,10 +82,10 @@ final class TranslogDirectoryReader extends DirectoryReader { Translog.Index operation, MappingLookup mappingLookup, DocumentParser documentParser, - Analyzer analyzer, + EngineConfig engineConfig, Runnable onSegmentCreated ) throws IOException { - this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, analyzer, onSegmentCreated)); + this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated)); } private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException { @@ -205,7 +204,7 @@ private static class TranslogLeafReader extends LeafReader { private final Translog.Index operation; private final MappingLookup mappingLookup; private final DocumentParser documentParser; - private final Analyzer analyzer; + private final EngineConfig engineConfig; private final Directory directory; private final Runnable onSegmentCreated; @@ -217,14 +216,14 @@ private static class TranslogLeafReader extends LeafReader { Translog.Index operation, MappingLookup mappingLookup, DocumentParser documentParser, - Analyzer analyzer, + EngineConfig engineConfig, Runnable onSegmentCreated ) { this.shardId = shardId; this.operation = operation; this.mappingLookup = mappingLookup; this.documentParser = documentParser; - this.analyzer = analyzer; + this.engineConfig = engineConfig; this.onSegmentCreated = onSegmentCreated; this.directory = new ByteBuffersDirectory(); this.uid = Uid.encodeId(operation.id()); @@ -264,7 +263,10 @@ private LeafReader createInMemoryLeafReader() { parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm()); parsedDocs.version().setLongValue(operation.version()); - final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE); + // To guarantee indexability, we configure the analyzer and codec using the main engine configuration + final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode( + IndexWriterConfig.OpenMode.CREATE + ).setCodec(engineConfig.getCodec()); try (IndexWriter writer = new IndexWriter(directory, writeConfig)) { writer.addDocument(parsedDocs.rootDoc()); final DirectoryReader reader = open(writer); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 9e1be4c629b4a..20493ee576c0a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.xcontent.XContentType; import java.io.IOException; +import java.util.Arrays; import java.util.function.LongSupplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -114,6 +115,20 @@ public void testGetFromTranslogWithSyntheticSource() throws IOException { runGetFromTranslogWithOptions(docToIndex, sourceOptions, expectedFetchedSource, "\"long\"", 7L, true); } + public void testGetFromTranslogWithDenseVector() throws IOException { + float[] vector = new float[2048]; + for (int i = 0; i < vector.length; i++) { + vector[i] = randomFloat(); + } + String docToIndex = Strings.format(""" + { + "bar": %s, + "foo": "foo" + } + """, Arrays.toString(vector)); + runGetFromTranslogWithOptions(docToIndex, "\"enabled\": true", docToIndex, "\"text\"", "foo", "\"dense_vector\"", false); + } + private void runGetFromTranslogWithOptions( String docToIndex, String sourceOptions, @@ -122,23 +137,48 @@ private void runGetFromTranslogWithOptions( Object expectedFooVal, boolean sourceOnlyFetchCreatesInMemoryReader ) throws IOException { - IndexMetadata metadata = IndexMetadata.builder("test").putMapping(Strings.format(""" - { - "properties": { - "foo": { - "type": %s, - "store": true - }, - "bar": { "type": %s } - }, - "_source": { %s } - } - }""", fieldType, fieldType, sourceOptions)).settings(indexSettings(IndexVersion.current(), 1, 1)).primaryTerm(0, 1).build(); + runGetFromTranslogWithOptions( + docToIndex, + sourceOptions, + expectedResult, + fieldType, + expectedFooVal, + fieldType, + sourceOnlyFetchCreatesInMemoryReader + ); + } + + private void runGetFromTranslogWithOptions( + String docToIndex, + String sourceOptions, + String expectedResult, + String fieldTypeFoo, + Object expectedFooVal, + String fieldTypeBar, + boolean sourceOnlyFetchCreatesInMemoryReader + ) throws IOException { + IndexMetadata metadata = IndexMetadata.builder("test") + .putMapping(Strings.format(""" + { + "properties": { + "foo": { + "type": %s, + "store": true + }, + "bar": { "type": %s } + }, + "_source": { %s } + } + }""", fieldTypeFoo, fieldTypeBar, sourceOptions)) + .settings(indexSettings(IndexVersion.current(), 1, 1)) + .primaryTerm(0, 1) + .build(); IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, EngineTestCase.randomReaderWrapper()); recoverShardFromStore(primary); LongSupplier translogInMemorySegmentCount = ((InternalEngine) primary.getEngine()).translogInMemorySegmentsCount::get; long translogInMemorySegmentCountExpected = 0; - indexDoc(primary, "test", "0", docToIndex); + Engine.IndexResult res = indexDoc(primary, "test", "0", docToIndex); + assertTrue(res.isCreated()); assertTrue(primary.getEngine().refreshNeeded()); GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));