Skip to content

Commit

Permalink
Add a way to replace the value for a specific field in the original s…
Browse files Browse the repository at this point in the history
…ource with a patch id
  • Loading branch information
jimczi committed Sep 17, 2024
1 parent be076e6 commit da38044
Show file tree
Hide file tree
Showing 10 changed files with 994 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
import org.elasticsearch.xcontent.FlatteningXContentParser;
import org.elasticsearch.xcontent.XContentLocation;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
Expand All @@ -36,6 +38,8 @@
* the lucene data structures and mappings to be dynamically created as the outcome of parsing a document.
*/
public abstract class DocumentParserContext {
public record XContentPatch(String fullPath, XContentLocation location, int id) {}

/**
* Wraps a given context while allowing to override some of its behaviour by re-implementing some of the non final methods
*/
Expand Down Expand Up @@ -124,6 +128,7 @@ public int get() {
* in this document and therefore is not present in mapping yet.
*/
private final Set<String> copyToFields;
private final Map<XContentLocation, XContentPatch> sourcePatches;

// Indicates if the source for this context has been cloned and gets parsed multiple times.
private boolean clonedSource;
Expand All @@ -146,6 +151,7 @@ private DocumentParserContext(
Set<String> fieldsAppliedFromTemplates,
Set<String> copyToFields,
DynamicMapperSize dynamicMapperSize,
Map<XContentLocation, XContentPatch> sourcePatches,
boolean clonedSource
) {
this.mappingLookup = mappingLookup;
Expand All @@ -165,6 +171,7 @@ private DocumentParserContext(
this.fieldsAppliedFromTemplates = fieldsAppliedFromTemplates;
this.copyToFields = copyToFields;
this.dynamicMappersSize = dynamicMapperSize;
this.sourcePatches = sourcePatches;
this.clonedSource = clonedSource;
}

Expand All @@ -187,6 +194,7 @@ private DocumentParserContext(ObjectMapper parent, ObjectMapper.Dynamic dynamic,
in.fieldsAppliedFromTemplates,
in.copyToFields,
in.dynamicMappersSize,
in.sourcePatches,
in.clonedSource
);
}
Expand Down Expand Up @@ -216,6 +224,7 @@ protected DocumentParserContext(
new HashSet<>(),
new HashSet<>(mappingLookup.fieldTypesLookup().getCopyToDestinationFields()),
new DynamicMapperSize(),
new HashMap<>(),
false
);
}
Expand Down Expand Up @@ -300,6 +309,36 @@ public final void addToFieldNames(String field) {
}
}

/**
* Registers a patch for a specific field at a given location in the original source.
* The patch modifies the field value in the original `_source` by replacing it with a numerical ID,
* which is then stored in a document field accessible via {@link SearchSourceBuilder#equals(Object)}.
*
* The original field value must still be retrievable when the original `_source` is requested.
* This responsibility lies with the {@link FieldMapper}, ensuring that the patched field can provide
* the original value during retrieval.
*
* Note: Only one patch per field is permitted for each document.
*
* @param fieldMapper The {@link FieldMapper} responsible for applying the patch to the field.
* @param xContentLocation The {@link XContentLocation} representing the location of the field in the original source.
*/
public void addSourceFieldPatch(FieldMapper fieldMapper, XContentLocation xContentLocation) {
var sourceFieldMapper = (SourceFieldMapper) getMetadataMapper(SourceFieldMapper.NAME);
if (sourceFieldMapper == null) {
return;
}
sourceFieldMapper.indexFieldPatch(doc(), fieldMapper, xContentLocation, getSourcePatches());
}

/**
* Returns all {@link XContentPatch} currently registered in this context,
* mapped by their {@link XContentLocation} in the original `_source`.
*/
public Map<XContentLocation, XContentPatch> getSourcePatches() {
return sourcePatches;
}

public final Field version() {
return this.version;
}
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/index/mapper/Mapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentLocation;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -175,6 +176,16 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
throw new IllegalArgumentException("field [" + fullPath() + "] of type [" + typeName() + "] doesn't support synthetic source");
}

/**
* Creates a {@link SourceLoader.PatchFieldLoader} to load patches that were previously indexed
* with {@link DocumentParserContext#addSourceFieldPatch(FieldMapper, XContentLocation)}.
*
* Returns {@code null} if the field doesn't allow patching.
*/
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
return null;
}

@Override
public String toString() {
return Strings.toString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
return root.syntheticFieldLoader(stream);
}

protected SourceLoader.PatchFieldLoader patchFieldLoader() {
return root.patchFieldLoader();
}

/**
* Merges a new mapping into the existing one.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.index.mapper;

import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.CheckedIntConsumer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -376,6 +377,31 @@ protected MapperMergeContext createChildContext(MapperMergeContext mapperMergeCo
);
}

@Override
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
var patchLoader = super.patchFieldLoader();
if (patchLoader == null) {
return null;
}
return context -> {
try {
var patchFieldLoader = patchLoader.leaf(context);
if (patchFieldLoader == null) {
return null;
}
IndexSearcher searcher = new IndexSearcher(context.reader());
searcher.setQueryCache(null);
var childScorer = searcher.createWeight(nestedTypeFilter, ScoreMode.COMPLETE_NO_SCORES, 1f).scorer(context);
var parentsDocs = bitsetProducer.apply(parentTypeFilter).getBitSet(context);
return (doc, acc) -> {
collectChildren(nestedTypePath, doc, parentsDocs, childScorer.iterator(), child -> patchFieldLoader.load(child, acc));
};
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}

@Override
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
if (storeArraySource()) {
Expand Down Expand Up @@ -427,29 +453,15 @@ public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf
if (childScorer != null) {
var parentDocs = parentBitSetProducer.get().getBitSet(leafReader.getContext());
return parentDoc -> {
collectChildren(parentDoc, parentDocs, childScorer.iterator());
children.clear();
collectChildren(nestedTypePath, parentDoc, parentDocs, childScorer.iterator(), child -> children.add(child));
return children.size() > 0;
};
} else {
return parentDoc -> false;
}
}

private List<Integer> collectChildren(int parentDoc, BitSet parentDocs, DocIdSetIterator childIt) throws IOException {
assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath;
final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1;
int childDocId = childIt.docID();
if (childDocId <= prevParentDoc) {
childDocId = childIt.advance(prevParentDoc + 1);
}

children.clear();
for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) {
children.add(childDocId);
}
return children;
}

@Override
public boolean hasValue() {
return children.size() > 0;
Expand Down Expand Up @@ -477,4 +489,24 @@ public String fieldName() {
return NestedObjectMapper.this.fullPath();
}
}

private static void collectChildren(
String nestedTypePath,
int parentDoc,
BitSet parentDocs,
DocIdSetIterator childIt,
CheckedIntConsumer<IOException> childConsumer
) throws IOException {
assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath;
final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1;
int childDocId = childIt.docID();
if (childDocId <= prevParentDoc) {
childDocId = childIt.advance(prevParentDoc + 1);
}

for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) {
childConsumer.accept(childDocId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ObjectMapper extends Mapper {
Expand Down Expand Up @@ -808,6 +809,31 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep

}

@Override
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
var loaders = mappers.values().stream().map(m -> m.patchFieldLoader()).filter(l -> l != null).collect(Collectors.toList());
if (loaders.isEmpty()) {
return null;
}
return context -> {
final List<SourceLoader.PatchFieldLoader.Leaf> leaves = new ArrayList<>();
for (var loader : loaders) {
var leaf = loader.leaf(context);
if (leaf != null) {
leaves.add(leaf);
}
}
if (leaves.isEmpty()) {
return null;
}
return (doc, acc) -> {
for (var leaf : leaves) {
leaf.load(doc, acc);
}
};
};
}

protected SourceLoader.SyntheticFieldLoader syntheticFieldLoader(Stream<Mapper> mappers, boolean isFragment) {
var fields = mappers.sorted(Comparator.comparing(Mapper::fullPath))
.map(Mapper::syntheticFieldLoader)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentGenerator;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.IOException;
import java.util.Set;

public class PatchSourceUtils {
public interface CheckedTriConsumer<S, T, U> {
void apply(S s, T t, U u) throws IOException;
}

/**
* Parses the given {@code source} and returns a new version with all values referenced by the
* provided {@code patchFullPaths} replaced using the {@code patchApply} consumer.
*/
public static BytesReference patchSource(
BytesReference source,
XContent xContent,
Set<String> patchFullPaths,
CheckedTriConsumer<String, XContentParser, XContentGenerator> patchApply
) throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput();
XContentBuilder builder = new XContentBuilder(xContent, streamOutput);
try (XContentParser parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, source, xContent.type())) {
if ((parser.currentToken() == null) && (parser.nextToken() == null)) {
return source;
}
parseAndPatchSource(builder.generator(), parser, "", patchFullPaths, patchApply);
return BytesReference.bytes(builder);
}
}

private static void parseAndPatchSource(
XContentGenerator destination,
XContentParser parser,
String fullPath,
Set<String> patchFullPaths,
CheckedTriConsumer<String, XContentParser, XContentGenerator> patchApply
) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
destination.writeFieldName(fieldName);
token = parser.nextToken();
fullPath = fullPath + (fullPath.isEmpty() ? "" : ".") + fieldName;
if (patchFullPaths.contains(fullPath)) {
patchApply.apply(fullPath, parser, destination);
return;
}
}

switch (token) {
case START_ARRAY -> {
destination.writeStartArray();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply);
}
destination.writeEndArray();
}
case START_OBJECT -> {
destination.writeStartObject();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply);
}
destination.writeEndObject();
}
default -> // others are simple:
destination.copyCurrentEvent(parser);
}
}
}
Loading

0 comments on commit da38044

Please sign in to comment.