Skip to content

Commit

Permalink
[feature/semantic_text] Register semantic text sub fields in the mapp…
Browse files Browse the repository at this point in the history
…ing (elastic#106560)

This PR refactors the semantic text field mapper to register its sub fields in the mapping instead of re-creating them each time when parsing documents.
It also fixes the generation of these fields in case the semantic text field is defined in an object field.
Lastly this change adds a new section called model_settings in the field parameter that is updated by the field mapper when inference results are received from a bulk action. The model settings are available in the fields as soon as the first document with the inference field is ingested and they are used to validate that updates. They are used to ensure consistency between what's used in the bulk action and what's defined in the field mapping.
  • Loading branch information
jimczi authored Mar 22, 2024
1 parent 823fb58 commit d4e283d
Show file tree
Hide file tree
Showing 18 changed files with 1,267 additions and 751 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public static Map<String, Object> nodeMapValue(Object node, String desc) {
if (node instanceof Map) {
return (Map<String, Object>) node;
} else {
throw new ElasticsearchParseException(desc + " should be a hash but was of type: " + node.getClass());
throw new ElasticsearchParseException(desc + " should be a map but was of type: " + node.getClass());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ public static final class Conflicts {
private final String mapperName;
private final List<String> conflicts = new ArrayList<>();

Conflicts(String mapperName) {
public Conflicts(String mapperName) {
this.mapperName = mapperName;
}

Expand All @@ -1188,7 +1188,11 @@ void addConflict(String parameter, String existing, String toMerge) {
conflicts.add("Cannot update parameter [" + parameter + "] from [" + existing + "] to [" + toMerge + "]");
}

void check() {
public boolean hasConflicts() {
return conflicts.isEmpty() == false;
}

public void check() {
if (conflicts.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CompressedXContent toCompressedXContent() {
/**
* Returns the root object for the current mapping
*/
RootObjectMapper getRoot() {
public RootObjectMapper getRoot() {
return root;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ public void parse(DocumentParserContext context) throws IOException {
}

String feature = null;
boolean origIsWithLeafObject = context.path().isWithinLeafObject();
try {
// make sure that we don't expand dots in field names while parsing
context.path().setWithinLeafObject(true);
if (context.path().isWithinLeafObject() == false) {
context.path().setWithinLeafObject(true);
}
for (Token token = context.parser().nextToken(); token != Token.END_OBJECT; token = context.parser().nextToken()) {
if (token == Token.FIELD_NAME) {
feature = context.parser().currentName();
Expand Down Expand Up @@ -207,7 +210,7 @@ public void parse(DocumentParserContext context) throws IOException {
context.addToFieldNames(fieldType().name());
}
} finally {
context.path().setWithinLeafObject(false);
context.path().setWithinLeafObject(origIsWithLeafObject);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static TestServiceSettings fromMap(Map<String, Object> map) {
SimilarityMeasure similarity = null;
String similarityStr = (String) map.remove("similarity");
if (similarityStr != null) {
similarity = SimilarityMeasure.valueOf(similarityStr);
similarity = SimilarityMeasure.fromString(similarityStr);
}

return new TestServiceSettings(model, dimensions, similarity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.elasticsearch.xpack.inference.external.http.sender.HttpRequestSender;
import org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceSettings;
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
import org.elasticsearch.xpack.inference.mapper.InferenceResultFieldMapper;
import org.elasticsearch.xpack.inference.mapper.InferenceMetadataFieldMapper;
import org.elasticsearch.xpack.inference.mapper.SemanticTextFieldMapper;
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
import org.elasticsearch.xpack.inference.rest.RestDeleteInferenceModelAction;
Expand Down Expand Up @@ -284,7 +284,7 @@ public Map<String, Mapper.TypeParser> getMappers() {

@Override
public Map<String, MetadataFieldMapper.TypeParser> getMetadataMappers() {
return Map.of(InferenceResultFieldMapper.NAME, InferenceResultFieldMapper.PARSER);
return Map.of(InferenceMetadataFieldMapper.NAME, InferenceMetadataFieldMapper.PARSER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.elasticsearch.inference.Model;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.inference.mapper.InferenceResultFieldMapper;
import org.elasticsearch.xpack.inference.mapper.InferenceMetadataFieldMapper;
import org.elasticsearch.xpack.inference.registry.ModelRegistry;

import java.util.ArrayList;
Expand All @@ -50,7 +50,7 @@

/**
* An {@link ActionFilter} that performs inference on {@link BulkShardRequest} asynchronously and stores the results in
* the individual {@link BulkItemRequest}. The results are then consumed by the {@link InferenceResultFieldMapper}
* the individual {@link BulkItemRequest}. The results are then consumed by the {@link InferenceMetadataFieldMapper}
* in the subsequent {@link TransportShardBulkAction} downstream.
*/
public class ShardBulkInferenceActionFilter implements MappedActionFilter {
Expand Down Expand Up @@ -267,10 +267,10 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
Map<String, Object> newDocMap = indexRequest.sourceAsMap();
Map<String, Object> inferenceMap = new LinkedHashMap<>();
// ignore the existing inference map if any
newDocMap.put(InferenceResultFieldMapper.NAME, inferenceMap);
newDocMap.put(InferenceMetadataFieldMapper.NAME, inferenceMap);
for (FieldInferenceResponse fieldResponse : response.responses()) {
try {
InferenceResultFieldMapper.applyFieldInference(
InferenceMetadataFieldMapper.applyFieldInference(
inferenceMap,
fieldResponse.field(),
fieldResponse.model(),
Expand All @@ -295,6 +295,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
continue;
}
final Map<String, Object> docMap = indexRequest.sourceAsMap();
boolean hasInput = false;
for (var entry : fieldInferenceMetadata.getFieldInferenceOptions().entrySet()) {
String field = entry.getKey();
String inferenceId = entry.getValue().inferenceId();
Expand All @@ -315,6 +316,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
if (value instanceof String valueStr) {
List<FieldInferenceRequest> fieldRequests = fieldRequestsMap.computeIfAbsent(inferenceId, k -> new ArrayList<>());
fieldRequests.add(new FieldInferenceRequest(item.id(), field, valueStr));
hasInput = true;
} else {
inferenceResults.get(item.id()).failures.add(
new ElasticsearchStatusException(
Expand All @@ -326,6 +328,12 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
);
}
}
if (hasInput == false) {
// remove the existing _inference field (if present) since none of the content require inference.
if (docMap.remove(InferenceMetadataFieldMapper.NAME) != null) {
indexRequest.source(docMap);
}
}
}
return fieldRequestsMap;
}
Expand Down
Loading

0 comments on commit d4e283d

Please sign in to comment.