diff --git a/docs/changelog/116942.yaml b/docs/changelog/116942.yaml new file mode 100644 index 0000000000000..5037e8c59cd85 --- /dev/null +++ b/docs/changelog/116942.yaml @@ -0,0 +1,5 @@ +pr: 116942 +summary: Fix handling of bulk requests with semantic text fields and delete ops +area: Relevance +type: bug +issues: [] diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java index 73c0f6d4c7685..3b0fc869c8124 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -30,8 +31,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput; import static org.hamcrest.Matchers.equalTo; @@ -87,30 +90,38 @@ public void testBulkOperations() throws Exception { int totalBulkReqs = randomIntBetween(2, 100); long totalDocs = 0; + Set ids = new HashSet<>(); for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) { BulkRequestBuilder bulkReqBuilder = client().prepareBulk(); int totalBulkSize = randomIntBetween(1, 100); for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) { - String id = Long.toString(totalDocs); + if (ids.size() > 0 && rarely(random())) { + String id = randomFrom(ids); + ids.remove(id); + DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id); + bulkReqBuilder.add(request); + continue; + } + String id = Long.toString(totalDocs++); boolean isIndexRequest = randomBoolean(); Map source = new HashMap<>(); source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput()); source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput()); if (isIndexRequest) { bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source)); - totalDocs++; + ids.add(id); } else { boolean isUpsert = randomBoolean(); UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source); - if (isUpsert || totalDocs == 0) { + if (isUpsert || ids.size() == 0) { request.setDocAsUpsert(true); - totalDocs++; } else { // Update already existing document - id = Long.toString(randomLongBetween(0, totalDocs - 1)); + id = randomFrom(ids); } request.setId(id); bulkReqBuilder.add(request); + ids.add(id); } } BulkResponse bulkResponse = bulkReqBuilder.get(); @@ -135,7 +146,7 @@ public void testBulkOperations() throws Exception { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true); SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get(); try { - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs)); + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) ids.size())); } finally { searchResponse.decRef(); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java index 9d3a263b506c9..8f3b65831b998 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java @@ -38,7 +38,8 @@ public Set getFeatures() { public Set getTestFeatures() { return Set.of( SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, - SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX + SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX, + SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX ); } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index b3bbe3a7df9bc..dd59230e575c4 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -413,8 +413,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons */ private Map> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) { Map> fieldRequestsMap = new LinkedHashMap<>(); - int itemIndex = 0; - for (var item : bulkShardRequest.items()) { + for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) { + var item = bulkShardRequest.items()[itemIndex]; if (item.getPrimaryResponse() != null) { // item was already aborted/processed by a filter in the chain upstream (e.g. security) continue; @@ -441,6 +441,7 @@ private Map> createFieldInferenceRequests(Bu // ignore delete request continue; } + final Map docMap = indexRequest.sourceAsMap(); for (var entry : fieldInferenceMap.values()) { String field = entry.getName(); @@ -483,7 +484,6 @@ private Map> createFieldInferenceRequests(Bu } } } - itemIndex++; } return fieldRequestsMap; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java index d70931a85c82e..86ba96e1369f0 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java @@ -90,8 +90,8 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id"); public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2"); public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix"); - public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix"); + public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix"); public static final String CONTENT_TYPE = "semantic_text"; public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID; diff --git a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml index 445df1dc302b9..1cf48417242c9 100644 --- a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml +++ b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml @@ -624,3 +624,55 @@ setup: - match: { _source.level_1.dense_field.text: "another inference test" } - exists: _source.level_1.dense_field.inference.chunks.0.embeddings - match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" } + +--- +"Deletes on bulk operation": + - requires: + cluster_features: semantic_text.delete_fix + reason: Delete operations are properly applied when subsequent operations include a semantic text field. + + - do: + bulk: + index: test-index + refresh: true + body: | + {"index":{"_id": "1"}} + {"dense_field": ["you know, for testing", "now with chunks"]} + {"index":{"_id": "2"}} + {"dense_field": ["some more tests", "that include chunks"]} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 2 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] } + - match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] } + + - do: + bulk: + index: test-index + refresh: true + body: | + {"delete":{ "_id": "2"}} + {"update":{"_id": "1"}} + {"doc":{"dense_field": "updated text"}} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 1 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: "updated text" }