From da38044f2c16477c353d367b408c7584a70147de Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 17 Sep 2024 16:13:02 +0100 Subject: [PATCH] Add a way to replace the value for a specific field in the original source with a patch id --- .../index/mapper/DocumentParserContext.java | 39 ++ .../elasticsearch/index/mapper/Mapper.java | 11 + .../elasticsearch/index/mapper/Mapping.java | 4 + .../index/mapper/NestedObjectMapper.java | 64 ++- .../index/mapper/ObjectMapper.java | 26 ++ .../index/mapper/PatchSourceUtils.java | 87 ++++ .../index/mapper/SourceFieldMapper.java | 98 +++- .../index/mapper/SourceLoader.java | 179 +++++++- .../index/mapper/PatchSourceMapperTests.java | 432 ++++++++++++++++++ .../index/mapper/SourceFieldMapperTests.java | 76 +++ 10 files changed, 994 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/mapper/PatchSourceUtils.java create mode 100644 server/src/test/java/org/elasticsearch/index/mapper/PatchSourceMapperTests.java diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java index 26db211345eae..5e852d05331b1 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java @@ -17,8 +17,10 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.mapper.MapperService.MergeReason; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xcontent.FilterXContentParserWrapper; import org.elasticsearch.xcontent.FlatteningXContentParser; +import org.elasticsearch.xcontent.XContentLocation; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; @@ -36,6 +38,8 @@ * the lucene data structures and mappings to be dynamically created as the outcome of parsing a document. */ public abstract class DocumentParserContext { + public record XContentPatch(String fullPath, XContentLocation location, int id) {} + /** * Wraps a given context while allowing to override some of its behaviour by re-implementing some of the non final methods */ @@ -124,6 +128,7 @@ public int get() { * in this document and therefore is not present in mapping yet. */ private final Set copyToFields; + private final Map sourcePatches; // Indicates if the source for this context has been cloned and gets parsed multiple times. private boolean clonedSource; @@ -146,6 +151,7 @@ private DocumentParserContext( Set fieldsAppliedFromTemplates, Set copyToFields, DynamicMapperSize dynamicMapperSize, + Map sourcePatches, boolean clonedSource ) { this.mappingLookup = mappingLookup; @@ -165,6 +171,7 @@ private DocumentParserContext( this.fieldsAppliedFromTemplates = fieldsAppliedFromTemplates; this.copyToFields = copyToFields; this.dynamicMappersSize = dynamicMapperSize; + this.sourcePatches = sourcePatches; this.clonedSource = clonedSource; } @@ -187,6 +194,7 @@ private DocumentParserContext(ObjectMapper parent, ObjectMapper.Dynamic dynamic, in.fieldsAppliedFromTemplates, in.copyToFields, in.dynamicMappersSize, + in.sourcePatches, in.clonedSource ); } @@ -216,6 +224,7 @@ protected DocumentParserContext( new HashSet<>(), new HashSet<>(mappingLookup.fieldTypesLookup().getCopyToDestinationFields()), new DynamicMapperSize(), + new HashMap<>(), false ); } @@ -300,6 +309,36 @@ public final void addToFieldNames(String field) { } } + /** + * Registers a patch for a specific field at a given location in the original source. + * The patch modifies the field value in the original `_source` by replacing it with a numerical ID, + * which is then stored in a document field accessible via {@link SearchSourceBuilder#equals(Object)}. + * + * The original field value must still be retrievable when the original `_source` is requested. + * This responsibility lies with the {@link FieldMapper}, ensuring that the patched field can provide + * the original value during retrieval. + * + * Note: Only one patch per field is permitted for each document. + * + * @param fieldMapper The {@link FieldMapper} responsible for applying the patch to the field. + * @param xContentLocation The {@link XContentLocation} representing the location of the field in the original source. + */ + public void addSourceFieldPatch(FieldMapper fieldMapper, XContentLocation xContentLocation) { + var sourceFieldMapper = (SourceFieldMapper) getMetadataMapper(SourceFieldMapper.NAME); + if (sourceFieldMapper == null) { + return; + } + sourceFieldMapper.indexFieldPatch(doc(), fieldMapper, xContentLocation, getSourcePatches()); + } + + /** + * Returns all {@link XContentPatch} currently registered in this context, + * mapped by their {@link XContentLocation} in the original `_source`. + */ + public Map getSourcePatches() { + return sourcePatches; + } + public final Field version() { return this.version; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java index ed8b3be7b2592..5424729e41795 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -18,6 +18,7 @@ import org.elasticsearch.index.IndexVersions; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentLocation; import java.io.IOException; import java.util.Arrays; @@ -175,6 +176,16 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { throw new IllegalArgumentException("field [" + fullPath() + "] of type [" + typeName() + "] doesn't support synthetic source"); } + /** + * Creates a {@link SourceLoader.PatchFieldLoader} to load patches that were previously indexed + * with {@link DocumentParserContext#addSourceFieldPatch(FieldMapper, XContentLocation)}. + * + * Returns {@code null} if the field doesn't allow patching. + */ + protected SourceLoader.PatchFieldLoader patchFieldLoader() { + return null; + } + @Override public String toString() { return Strings.toString(this); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java index 52bc48004ccda..a98cc7c654e29 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java @@ -131,6 +131,10 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { return root.syntheticFieldLoader(stream); } + protected SourceLoader.PatchFieldLoader patchFieldLoader() { + return root.patchFieldLoader(); + } + /** * Merges a new mapping into the existing one. * diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NestedObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NestedObjectMapper.java index adf1b329d9e83..5612611e3de97 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NestedObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NestedObjectMapper.java @@ -10,6 +10,7 @@ package org.elasticsearch.index.mapper; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.search.CheckedIntConsumer; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -376,6 +377,31 @@ protected MapperMergeContext createChildContext(MapperMergeContext mapperMergeCo ); } + @Override + protected SourceLoader.PatchFieldLoader patchFieldLoader() { + var patchLoader = super.patchFieldLoader(); + if (patchLoader == null) { + return null; + } + return context -> { + try { + var patchFieldLoader = patchLoader.leaf(context); + if (patchFieldLoader == null) { + return null; + } + IndexSearcher searcher = new IndexSearcher(context.reader()); + searcher.setQueryCache(null); + var childScorer = searcher.createWeight(nestedTypeFilter, ScoreMode.COMPLETE_NO_SCORES, 1f).scorer(context); + var parentsDocs = bitsetProducer.apply(parentTypeFilter).getBitSet(context); + return (doc, acc) -> { + collectChildren(nestedTypePath, doc, parentsDocs, childScorer.iterator(), child -> patchFieldLoader.load(child, acc)); + }; + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + @Override public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { if (storeArraySource()) { @@ -427,7 +453,8 @@ public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf if (childScorer != null) { var parentDocs = parentBitSetProducer.get().getBitSet(leafReader.getContext()); return parentDoc -> { - collectChildren(parentDoc, parentDocs, childScorer.iterator()); + children.clear(); + collectChildren(nestedTypePath, parentDoc, parentDocs, childScorer.iterator(), child -> children.add(child)); return children.size() > 0; }; } else { @@ -435,21 +462,6 @@ public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf } } - private List collectChildren(int parentDoc, BitSet parentDocs, DocIdSetIterator childIt) throws IOException { - assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath; - final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1; - int childDocId = childIt.docID(); - if (childDocId <= prevParentDoc) { - childDocId = childIt.advance(prevParentDoc + 1); - } - - children.clear(); - for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) { - children.add(childDocId); - } - return children; - } - @Override public boolean hasValue() { return children.size() > 0; @@ -477,4 +489,24 @@ public String fieldName() { return NestedObjectMapper.this.fullPath(); } } + + private static void collectChildren( + String nestedTypePath, + int parentDoc, + BitSet parentDocs, + DocIdSetIterator childIt, + CheckedIntConsumer childConsumer + ) throws IOException { + assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath; + final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1; + int childDocId = childIt.docID(); + if (childDocId <= prevParentDoc) { + childDocId = childIt.advance(prevParentDoc + 1); + } + + for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) { + childConsumer.accept(childDocId); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java index e4ce38d6cec0b..7f5ef060a6b0b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java @@ -37,6 +37,7 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeMap; +import java.util.stream.Collectors; import java.util.stream.Stream; public class ObjectMapper extends Mapper { @@ -808,6 +809,31 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep } + @Override + protected SourceLoader.PatchFieldLoader patchFieldLoader() { + var loaders = mappers.values().stream().map(m -> m.patchFieldLoader()).filter(l -> l != null).collect(Collectors.toList()); + if (loaders.isEmpty()) { + return null; + } + return context -> { + final List leaves = new ArrayList<>(); + for (var loader : loaders) { + var leaf = loader.leaf(context); + if (leaf != null) { + leaves.add(leaf); + } + } + if (leaves.isEmpty()) { + return null; + } + return (doc, acc) -> { + for (var leaf : leaves) { + leaf.load(doc, acc); + } + }; + }; + } + protected SourceLoader.SyntheticFieldLoader syntheticFieldLoader(Stream mappers, boolean isFragment) { var fields = mappers.sorted(Comparator.comparing(Mapper::fullPath)) .map(Mapper::syntheticFieldLoader) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/PatchSourceUtils.java b/server/src/main/java/org/elasticsearch/index/mapper/PatchSourceUtils.java new file mode 100644 index 0000000000000..c321b3dfe4ab9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/PatchSourceUtils.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.mapper; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentGenerator; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; + +import java.io.IOException; +import java.util.Set; + +public class PatchSourceUtils { + public interface CheckedTriConsumer { + void apply(S s, T t, U u) throws IOException; + } + + /** + * Parses the given {@code source} and returns a new version with all values referenced by the + * provided {@code patchFullPaths} replaced using the {@code patchApply} consumer. + */ + public static BytesReference patchSource( + BytesReference source, + XContent xContent, + Set patchFullPaths, + CheckedTriConsumer patchApply + ) throws IOException { + BytesStreamOutput streamOutput = new BytesStreamOutput(); + XContentBuilder builder = new XContentBuilder(xContent, streamOutput); + try (XContentParser parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, source, xContent.type())) { + if ((parser.currentToken() == null) && (parser.nextToken() == null)) { + return source; + } + parseAndPatchSource(builder.generator(), parser, "", patchFullPaths, patchApply); + return BytesReference.bytes(builder); + } + } + + private static void parseAndPatchSource( + XContentGenerator destination, + XContentParser parser, + String fullPath, + Set patchFullPaths, + CheckedTriConsumer patchApply + ) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + destination.writeFieldName(fieldName); + token = parser.nextToken(); + fullPath = fullPath + (fullPath.isEmpty() ? "" : ".") + fieldName; + if (patchFullPaths.contains(fullPath)) { + patchApply.apply(fullPath, parser, destination); + return; + } + } + + switch (token) { + case START_ARRAY -> { + destination.writeStartArray(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply); + } + destination.writeEndArray(); + } + case START_OBJECT -> { + destination.writeStartObject(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply); + } + destination.writeEndObject(); + } + default -> // others are simple: + destination.copyCurrentEvent(parser); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 3318595ed7129..a50815ded7562 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -28,6 +28,8 @@ import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.lookup.Source; import org.elasticsearch.search.lookup.SourceFilter; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentLocation; import org.elasticsearch.xcontent.XContentType; import java.io.IOException; @@ -36,7 +38,10 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import static org.elasticsearch.index.mapper.DocumentParserContext.XContentPatch; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING; public class SourceFieldMapper extends MetadataFieldMapper { @@ -50,6 +55,7 @@ public class SourceFieldMapper extends MetadataFieldMapper { public static final NodeFeature SYNTHETIC_SOURCE_COPY_TO_FIX = new NodeFeature("mapper.source.synthetic_source_copy_to_fix"); public static final String NAME = "_source"; + public static final String PATCH_NAME = "_source_patch"; public static final String RECOVERY_SOURCE_NAME = "_recovery_source"; public static final String CONTENT_TYPE = "_source"; @@ -411,20 +417,21 @@ public boolean isComplete() { } @Override - public void preParse(DocumentParserContext context) throws IOException { + public void postParse(DocumentParserContext context) throws IOException { BytesReference originalSource = context.sourceToParse().source(); XContentType contentType = context.sourceToParse().getXContentType(); - final BytesReference adaptedSource = applyFilters(originalSource, contentType); + final BytesReference patchSource = applyPatches(originalSource, contentType.xContent(), context.getSourcePatches()); + final BytesReference adaptedSource = applyFilters(patchSource, contentType); if (adaptedSource != null) { assert context.indexSettings().getIndexVersionCreated().before(IndexVersions.V_8_7_0) || indexMode == null || indexMode.isSyntheticSourceEnabled() == false; - final BytesRef ref = adaptedSource.toBytesRef(); + BytesRef ref = adaptedSource.toBytesRef(); context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length)); } - if (enableRecoverySource && originalSource != null && adaptedSource != originalSource) { + if (enableRecoverySource && originalSource != null && adaptedSource != patchSource) { // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery BytesRef ref = originalSource.toBytesRef(); context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); @@ -462,10 +469,91 @@ public SourceLoader newSourceLoader(Mapping mapping, SourceFieldMetrics metrics) if (mode == Mode.SYNTHETIC) { return new SourceLoader.Synthetic(mapping::syntheticFieldLoader, metrics); } - return SourceLoader.FROM_STORED_SOURCE; + var patchLoader = mapping.patchFieldLoader(); + return patchLoader == null ? SourceLoader.FROM_STORED_SOURCE : new SourceLoader.PatchSourceLoader(patchLoader); } public boolean isSynthetic() { return mode == Mode.SYNTHETIC; } + + /** + * Retrieves the name of the numeric doc-value field used to store the patch reference + * ID for the specified fullPath. + */ + public static String getPatchFieldName(String fullPath) { + return PATCH_NAME + "." + fullPath; + } + + /** + * Verifies and indexes a patch for a specific field in the document. + * + * Ensures only one patch is applied per field per document. If valid, a numeric doc-value field + * is added to reference the patch in the modified source. + * + * The numeric value acts as a unique reference to the patch. The reference field's name + * can be retrieved using {@link #getPatchFieldName(String)}. + */ + public void indexFieldPatch( + LuceneDocument doc, + FieldMapper fieldMapper, + XContentLocation location, + Map acc + ) { + if (enabled() == false || stored() == false) { + return; + } + + if (isSynthetic()) { + throw new IllegalArgumentException( + "Patching the field [" + + fieldMapper.leafName() + + "] in the original source is not allowed when synthetic source is enabled." + ); + } + + // TODO handle includes and excludes + int id = acc.size(); + var patch = new XContentPatch(fieldMapper.fullPath(), location, id); + if (acc.put(location, patch) != null) { + throw new IllegalArgumentException( + "Field [" + fieldMapper.fullPath() + "] does not support patching the same location [" + location + "] more than once." + ); + } + String patchFieldName = getPatchFieldName(fieldMapper.fullPath()); + if (doc.getByKey(patchFieldName) != null) { + throw new IllegalArgumentException( + "Field [" + patch.fullPath() + "] does not support patching multiple values for the same field in a single document." + ); + } + doc.addWithKey(patchFieldName, new NumericDocValuesField(patchFieldName, patch.id())); + } + + /** + * Applies the provided patches to the given source, replacing each {@link XContentPatch#fullPath()} with a numeric value. + * + * Throws an {@link IllegalArgumentException} if a patch is applied at a location that doesn't match + * its {@link XContentPatch#fullPath()}, or if no patch is registered at the specified location. + */ + static BytesReference applyPatches(BytesReference source, XContent xContent, Map patches) + throws IOException { + if (patches.isEmpty()) { + return source; + } + var patchFields = patches.values().stream().map(p -> p.fullPath()).collect(Collectors.toSet()); + var res = PatchSourceUtils.patchSource(source, xContent, patchFields, (fullPath, parser, dest) -> { + var patch = patches.remove(parser.getTokenLocation()); + if (patch == null) { + throw new IllegalArgumentException( + "Registered patch not found at location: [" + parser.getTokenLocation() + "] for path: [" + fullPath + "]." + ); + } + dest.writeNumber(patch.id()); + parser.skipChildren(); + }); + if (patches.size() > 0) { + throw new IllegalArgumentException("Unable to apply all patches: " + patches.values()); + } + return res; + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceLoader.java index 43bf6f2bd83dd..a70dd0e1df13a 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceLoader.java @@ -10,16 +10,27 @@ package org.elasticsearch.index.mapper; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; import org.elasticsearch.search.lookup.Source; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentGenerator; +import org.elasticsearch.xcontent.XContentLocation; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +43,8 @@ * Loads source {@code _source} during a GET or {@code _search}. */ public interface SourceLoader { + record Patch(String fullPath, int id, BytesReference rawValue) {} + /** * Does this {@link SourceLoader} reorder field values? */ @@ -95,7 +108,7 @@ public void write(LeafStoredFieldLoader storedFields, int docId, XContentBuilder @Override public Set requiredStoredFields() { - return Set.of(); + return Set.of(SourceFieldMapper.NAME); } }; @@ -220,6 +233,110 @@ public void write(LeafStoredFieldLoader storedFieldLoader, int docId, XContentBu } } + /** + * Retrieves the {@code _source} from the stored fields and applies all {@link Patch} instances + * indexed via {@link DocumentParserContext#addSourceFieldPatch(FieldMapper, XContentLocation)} during indexing. + * Replaces all patch references in the retrieved {@code _source} with their actual values using the + * {@link PatchFieldLoader}. + */ + class PatchSourceLoader implements SourceLoader { + final PatchFieldLoader field; + + public PatchSourceLoader(PatchFieldLoader field) { + this.field = field; + } + + @Override + public boolean reordersFieldValues() { + return false; + } + + @Override + public Set requiredStoredFields() { + return FROM_STORED_SOURCE.requiredStoredFields(); + } + + @Override + public Leaf leaf(LeafReader reader, int[] docIdsInLeaf) throws IOException { + var sourceLeaf = FROM_STORED_SOURCE.leaf(reader, docIdsInLeaf); + var patchLeaf = field.leaf(reader.getContext()); + return new Leaf() { + @Override + public Source source(LeafStoredFieldLoader storedFields, int docId) throws IOException { + Source source = sourceLeaf.source(storedFields, docId); + if (patchLeaf == null) { + return source; + } + List patches = new ArrayList<>(); + patchLeaf.load(docId, patches); + var finalPatches = verifyPatches(patches, docId); + if (finalPatches.length == 0) { + return source; + } + var patchSource = Source.fromBytes( + PatchSourceUtils.patchSource( + source.internalSourceRef(), + source.sourceContentType().xContent(), + patches.stream().map(p -> p.fullPath()).collect(Collectors.toSet()), + (fullPath, parser, destination) -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, parser.currentToken(), parser); + int id = parser.intValue(); + if (id > finalPatches.length) { + throw new IOException("Patch " + id + " not found for path " + fullPath); + } + var patch = finalPatches[id]; + if (patch == null || patch.fullPath.equals(fullPath) == false) { + throw new IOException("No patch found for path " + fullPath); + } + finalPatches[id] = null; + applyPatch(patch, destination); + } + ), + source.sourceContentType() + ); + List nonConsumed = Arrays.stream(finalPatches).filter(p -> p != null).collect(Collectors.toList()); + if (nonConsumed.size() > 0) { + throw new IOException("Some patches were not processed: " + nonConsumed); + } + return patchSource; + } + + @Override + public void write(LeafStoredFieldLoader storedFields, int docId, XContentBuilder b) throws IOException { + throw new IllegalStateException("This operation is not allowed in the current context"); + } + }; + } + + private static Patch[] verifyPatches(List patches, int docId) throws IOException { + Patch[] ordered = patches.stream().sorted(Comparator.comparingInt(Patch::id)).toArray(Patch[]::new); + for (int i = 0; i < ordered.length; i++) { + if (ordered[i].id() != i) { + throw new IOException("Registered patch " + i + " missing for document ID: " + docId); + } + } + return ordered; + } + + private static void applyPatch(Patch patch, XContentGenerator destination) throws IOException { + try ( + XContentParser patchParser = XContentHelper.createParserNotCompressed( + XContentParserConfiguration.EMPTY, + patch.rawValue, + XContentType.JSON + ) + ) { + // skip the start object and field name to expose the actual value + var patchToken = patchParser.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, patchToken, patchParser); + patchToken = patchParser.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, patchToken, patchParser); + patchParser.nextToken(); + destination.copyCurrentStructure(patchParser); + } + } + } + /** * Load a field for {@link Synthetic}. *

@@ -355,6 +472,25 @@ interface DocValuesLoader { } } + /** + * Per-field loader that retrieves {@link Patch} references and their original values, as indexed by + * {@link SourceFieldMapper#indexFieldPatch(LuceneDocument, FieldMapper, XContentLocation, Map)}. + */ + interface PatchFieldLoader { + /** + * Returns a leaf loader if the provided context contains patches for the specified field; + * returns null otherwise. + */ + PatchFieldLoader.Leaf leaf(LeafReaderContext context) throws IOException; + + interface Leaf { + /** + * Loads all patches associated with the provided document into the specified {@code acc} list. + */ + void load(int doc, List acc) throws IOException; + } + } + /** * Synthetic field loader that uses only doc values to load synthetic source values. */ @@ -370,4 +506,45 @@ public void reset() { // since DocValuesLoader#advanceToDoc will reset the state anyway. } } + + /** + * A patch field loader that utilizes a {@link SyntheticFieldLoader} to fetch the original patched value. + */ + class SyntheticPatchFieldLoader implements PatchFieldLoader { + private final SyntheticFieldLoader syntheticField; + + public SyntheticPatchFieldLoader(SyntheticFieldLoader syntheticFieldLoader) { + this.syntheticField = syntheticFieldLoader; + } + + @Override + public Leaf leaf(LeafReaderContext context) throws IOException { + var patchLoader = context.reader().getNumericDocValues(SourceFieldMapper.getPatchFieldName(syntheticField.fieldName())); + if (patchLoader == null) { + return null; + } + var fieldLoader = syntheticField.docValuesLoader(context.reader(), null); + return (doc, acc) -> { + if (patchLoader.advanceExact(doc) == false) { + return; + } + int patch = (int) patchLoader.longValue(); + if (fieldLoader == null) { + throw new IOException("Missing value for patch field [" + syntheticField.fieldName() + "] in doc [" + doc + "]"); + } + fieldLoader.advanceToDoc(doc); + if (syntheticField.hasValue()) { + BytesStreamOutput streamOutput = new BytesStreamOutput(); + XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput); + builder.startObject(); + syntheticField.write(builder); + builder.endObject(); + BytesReference rawValue = BytesReference.bytes(builder); + acc.add(new Patch(syntheticField.fieldName(), patch, rawValue)); + } else { + throw new IOException("Missing value for patch field [" + syntheticField.fieldName() + "] in doc [" + doc + "]"); + } + }; + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/PatchSourceMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/PatchSourceMapperTests.java new file mode 100644 index 0000000000000..2ee6b492f6e2b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/mapper/PatchSourceMapperTests.java @@ -0,0 +1,432 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.plugins.MapperPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.lookup.Source; +import org.elasticsearch.search.lookup.SourceProvider; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class PatchSourceMapperTests extends MapperServiceTestCase { + @Override + protected Collection getPlugins() { + return List.of(new TestPlugin()); + } + + public void testPatchSourceFlat() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of("field", Map.of("obj", Map.of("key1", "value1")), "another_field", randomAlphaOfLengthBetween(5, 10)) + ); + } + + public void testPatchSourceFlatWithCopyTo() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // field + b.startObject("field"); + b.field("type", "patch"); + b.field("copy_to", new String[] { "another_field" }); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch(mapperService, Map.of("field", "key1")); + } + + public void testPatchSourceFlatMulti() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + var exc = expectThrows( + DocumentParsingException.class, + () -> assertSourcePatch( + mapperService, + Map.of( + "field", + List.of(Map.of("obj", Map.of("key1", "value1")), Map.of("another", "one")), + "another_field", + randomAlphaOfLengthBetween(5, 10) + ) + ) + ); + assertThat(exc.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(exc.getDetailedMessage(), containsString("doesn't support patching multiple values")); + } + + public void testPatchSourceObject() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // obj + b.startObject("obj"); + b.startObject("properties"); + + // field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // obj.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of("obj", Map.of("field", Map.of("key1", "value1")), "another_field", randomAlphaOfLengthBetween(5, 10)) + ); + } + + public void testPatchSourceObjectFlat() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // obj + b.startObject("obj"); + b.startObject("properties"); + + // obj.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // obj.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch(mapperService, Map.of("obj.field", Map.of("key1", "value1"), "another_field", randomAlphaOfLengthBetween(5, 10))); + } + + public void testPatchSourceNestedObject() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // nested + b.startObject("nested"); + b.field("type", "nested"); + b.startObject("properties"); + + // nested.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // nested.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of("nested", Map.of("field", Map.of("key1", "value1")), "another_field", randomAlphaOfLengthBetween(5, 10)) + ); + } + + public void testPatchSourceNestedArray() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // nested + b.startObject("nested"); + b.field("type", "nested"); + b.startObject("properties"); + + // nested.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // nested.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of( + "nested", + List.of( + Map.of("field", Map.of()), + Map.of(), + Map.of("field", Map.of("key1", "value1")), + Map.of("another_field", randomAlphaOfLengthBetween(5, 10)) + ), + "another_field", + randomAlphaOfLengthBetween(5, 10) + ) + ); + } + + public void testPatchSourceMulti() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // obj + b.startObject("obj"); + b.startObject("properties"); + + // obj.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // obj.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // nested + b.startObject("nested"); + b.field("type", "nested"); + b.startObject("properties"); + + // nested.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // nested.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of( + "field", + Map.of("obj", Map.of("key1", "value1")), + "obj", + Map.of("field", Map.of("key1", "value1")), + "nested", + Map.of("field", Map.of("key1", "value1")), + "another_field", + randomAlphaOfLengthBetween(5, 10) + ) + ); + } + + public void testPatchSourceMultiFlat() throws IOException { + var mapperService = createMapperService(mapping(b -> { + // field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // obj + b.startObject("obj"); + b.startObject("properties"); + + // obj.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // nested + b.startObject("nested"); + b.field("type", "nested"); + b.startObject("properties"); + + // nested.field + b.startObject("field"); + b.field("type", "patch"); + b.endObject(); + + // nested.another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + + b.endObject(); + b.endObject(); + + // another_field + b.startObject("another_field"); + b.field("type", "keyword"); + b.endObject(); + })); + assertSourcePatch( + mapperService, + Map.of( + "field", + Map.of("obj", Map.of("key1", "value1")), + "obj.field", + Map.of("key1", "value1"), + "nested.field", + Map.of("key1", "value1"), + "another_field", + randomAlphaOfLengthBetween(5, 10) + ) + ); + } + + public static void assertSourcePatch(MapperService mapperService, Map source) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.value(source); + SourceToParse origSource = new SourceToParse("0", BytesReference.bytes(builder), builder.contentType()); + ParsedDocument doc = mapperService.documentMapper().parse(origSource); + var storedSource = doc.rootDoc().getField(SourceFieldMapper.NAME).binaryValue(); + assertThat(storedSource.length, lessThan(origSource.source().length())); + assertFalse(storedSource.utf8ToString().equals(origSource.source().utf8ToString())); + withLuceneIndex(mapperService, iw -> iw.addDocuments(doc.docs()), ir -> { + Source actual = SourceProvider.fromLookup(mapperService.mappingLookup(), mapperService.getMapperMetrics().sourceFieldMetrics()) + .getSource(ir.leaves().get(0), doc.docs().size() - 1); + assertEquals(origSource.source().utf8ToString(), actual.internalSourceRef().utf8ToString()); + }); + } + + static class TestPlugin extends Plugin implements MapperPlugin { + @Override + public Map getMappers() { + return Map.of(BinaryPatchSourceFieldMapper.CONTENT_TYPE, BinaryPatchSourceFieldMapper.PARSER); + } + } + + static class BinaryPatchSourceFieldMapper extends FieldMapper { + static final TypeParser PARSER = new TypeParser((n, c) -> new Builder(n)); + static final String CONTENT_TYPE = "patch"; + + static class Builder extends FieldMapper.Builder { + protected Builder(String name) { + super(name); + } + + @Override + protected Parameter[] getParameters() { + return new Parameter[0]; + } + + @Override + public FieldMapper build(MapperBuilderContext context) { + BinaryFieldMapper.Builder b = new BinaryFieldMapper.Builder(leafName(), false).docValues(true); + return new BinaryPatchSourceFieldMapper(leafName(), b.build(context), builderParams(b, context)); + } + } + + protected BinaryPatchSourceFieldMapper(String simpleName, BinaryFieldMapper delegate, BuilderParams builderParams) { + super(simpleName, delegate.fieldType(), builderParams); + } + + @Override + protected void parseCreateField(DocumentParserContext context) throws IOException { + context.addSourceFieldPatch(this, context.parser().getTokenLocation()); + XContentBuilder b = XContentBuilder.builder(context.parser().contentType().xContent()); + b.copyCurrentStructure(context.parser()); + context.doc().add(new BinaryDocValuesField(fullPath(), BytesReference.bytes(b).toBytesRef())); + context.parser().skipChildren(); + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + @Override + public Builder getMergeBuilder() { + return new Builder(leafName()); + } + + @Override + protected SyntheticSourceSupport syntheticSourceSupport() { + var fieldLoader = new BinaryDocValuesSyntheticFieldLoader(fullPath()) { + @Override + protected void writeValue(XContentBuilder b, BytesRef value) throws IOException { + try (var stream = new BytesArray(value.utf8ToString()).streamInput()) { + b.rawField(leafName(), stream, b.contentType()); + } + } + }; + return new SyntheticSourceSupport.Native(fieldLoader); + } + + @Override + protected SourceLoader.PatchFieldLoader patchFieldLoader() { + return new SourceLoader.SyntheticPatchFieldLoader(syntheticFieldLoader()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java index 2f417e688cb97..916de79a66be2 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java @@ -22,7 +22,9 @@ import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentLocation; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; @@ -31,6 +33,7 @@ import java.util.Map; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -545,4 +548,77 @@ public void testRecoverySourceWithTimeSeriesCustom() throws IOException { assertNull(doc.rootDoc().getField("_recovery_source")); } } + + public void testWrongLocationPatchSourceInPostParse() throws Exception { + MapperService mapperService = createMapperService(Settings.EMPTY, mapping(b -> { + b.startObject("field"); + b.field("type", "long"); + b.endObject(); + })); + SourceFieldMapper mapper = (SourceFieldMapper) mapperService.getMetadataMappers().get(SourceFieldMapper.class); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.value(Map.of("field", 45)); + var sourceToParse = new SourceToParse("0", BytesReference.bytes(builder), builder.contentType()); + FieldMapper fieldMapper = (FieldMapper) mapperService.mappingLookup().getMapper("field"); + try ( + var parser = XContentHelper.createParserNotCompressed( + XContentParserConfiguration.EMPTY, + sourceToParse.source(), + XContentType.JSON + ) + ) { + DocumentParserContext context = new TestDocumentParserContext(MappingLookup.EMPTY, sourceToParse) { + @Override + public XContentParser parser() { + return parser; + } + }; + var xContentLocation = new XContentLocation(0, 3); + context.addSourceFieldPatch(fieldMapper, xContentLocation); + var exc = expectThrows(IllegalArgumentException.class, () -> mapper.postParse(context)); + assertThat(exc.getMessage(), containsString("Cannot find patch")); + } + } + + public void testRemainingPatchSourceInPostParse() throws Exception { + MapperService mapperService = createMapperService(Settings.EMPTY, mapping(b -> { + b.startObject("field"); + b.field("type", "long"); + b.endObject(); + + b.startObject("another_field"); + b.field("type", "long"); + b.endObject(); + })); + SourceFieldMapper mapper = (SourceFieldMapper) mapperService.getMetadataMappers().get(SourceFieldMapper.class); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.value(Map.of("another_field", 45)); + var sourceToParse = new SourceToParse("0", BytesReference.bytes(builder), builder.contentType()); + FieldMapper fieldMapper = (FieldMapper) mapperService.mappingLookup().getMapper("field"); + try ( + var parser = XContentHelper.createParserNotCompressed( + XContentParserConfiguration.EMPTY, + sourceToParse.source(), + XContentType.JSON + ) + ) { + DocumentParserContext context = new TestDocumentParserContext(MappingLookup.EMPTY, sourceToParse) { + @Override + public XContentParser parser() { + return parser; + } + }; + var xContentLocation1 = new XContentLocation(0, 3); + context.addSourceFieldPatch(fieldMapper, xContentLocation1); + { + var exc = expectThrows(IllegalArgumentException.class, () -> context.addSourceFieldPatch(fieldMapper, xContentLocation1)); + assertThat(exc.getMessage(), containsString(xContentLocation1.toString())); + } + var xContentLocation2 = new XContentLocation(2, 6); + context.addSourceFieldPatch(fieldMapper, xContentLocation2); + + var exc = expectThrows(IllegalArgumentException.class, () -> mapper.postParse(context)); + assertThat(exc.getMessage(), allOf(containsString(xContentLocation2.toString()), containsString(xContentLocation1.toString()))); + } + } }