diff --git a/docs/changelog/116942.yaml b/docs/changelog/116942.yaml new file mode 100644 index 0000000000000..5037e8c59cd85 --- /dev/null +++ b/docs/changelog/116942.yaml @@ -0,0 +1,5 @@ +pr: 116942 +summary: Fix handling of bulk requests with semantic text fields and delete ops +area: Relevance +type: bug +issues: [] diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java index 8da1aaabd517a..eff1a0111414a 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -30,8 +31,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -86,30 +89,38 @@ public void testBulkOperations() throws Exception { int totalBulkReqs = randomIntBetween(2, 100); long totalDocs = 0; + Set ids = new HashSet<>(); for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) { BulkRequestBuilder bulkReqBuilder = client().prepareBulk(); int totalBulkSize = randomIntBetween(1, 100); for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) { - String id = Long.toString(totalDocs); + if (ids.size() > 0 && rarely(random())) { + String id = randomFrom(ids); + ids.remove(id); + DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id); + bulkReqBuilder.add(request); + continue; + } + String id = Long.toString(totalDocs++); boolean isIndexRequest = randomBoolean(); Map source = new HashMap<>(); source.put("sparse_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000)); source.put("dense_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000)); if (isIndexRequest) { bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source)); - totalDocs++; + ids.add(id); } else { boolean isUpsert = randomBoolean(); UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source); - if (isUpsert || totalDocs == 0) { + if (isUpsert || ids.size() == 0) { request.setDocAsUpsert(true); - totalDocs++; } else { // Update already existing document - id = Long.toString(randomLongBetween(0, totalDocs - 1)); + id = randomFrom(ids); } request.setId(id); bulkReqBuilder.add(request); + ids.add(id); } } BulkResponse bulkResponse = bulkReqBuilder.get(); @@ -134,7 +145,7 @@ public void testBulkOperations() throws Exception { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true); SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get(); try { - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size())); } finally { searchResponse.decRef(); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java index 13ce090f69fc6..3a27fbc27a955 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java @@ -26,6 +26,10 @@ public Set getFeatures() { @Override public Set getTestFeatures() { - return Set.of(SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX); + return Set.of( + SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, + SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX, + SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX + ); } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index f5ff04ab7c15f..50c1f1db107ab 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -412,8 +412,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons */ private Map> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) { Map> fieldRequestsMap = new LinkedHashMap<>(); - int itemIndex = 0; - for (var item : bulkShardRequest.items()) { + for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) { + var item = bulkShardRequest.items()[itemIndex]; if (item.getPrimaryResponse() != null) { // item was already aborted/processed by a filter in the chain upstream (e.g. security) continue; @@ -440,6 +440,7 @@ private Map> createFieldInferenceRequests(Bu // ignore delete request continue; } + final Map docMap = indexRequest.sourceAsMap(); for (var entry : fieldInferenceMap.values()) { String field = entry.getName(); @@ -481,7 +482,6 @@ private Map> createFieldInferenceRequests(Bu } } } - itemIndex++; } return fieldRequestsMap; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java index 1fff6ffaad025..8dd5145fbf8dd 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java @@ -83,6 +83,7 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFieldMapper { public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix"); public static final NodeFeature SEMANTIC_TEXT_ZERO_SIZE_FIX = new NodeFeature("semantic_text.zero_size_fix"); + public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix"); public static final String CONTENT_TYPE = "semantic_text"; diff --git a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml index ae32ed1e4dcb9..e2f29d4ebd628 100644 --- a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml +++ b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml @@ -502,3 +502,55 @@ setup: - match: { _source.level_1.dense_field.text: "another inference test" } - exists: _source.level_1.dense_field.inference.chunks.0.embeddings - match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" } + +--- +"Deletes on bulk operation": + - requires: + cluster_features: semantic_text.delete_fix + reason: Delete operations are properly applied when subsequent operations include a semantic text field. + + - do: + bulk: + index: test-index + refresh: true + body: | + {"index":{"_id": "1"}} + {"dense_field": ["you know, for testing", "now with chunks"]} + {"index":{"_id": "2"}} + {"dense_field": ["some more tests", "that include chunks"]} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 2 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] } + - match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] } + + - do: + bulk: + index: test-index + refresh: true + body: | + {"delete":{ "_id": "2"}} + {"update":{"_id": "1"}} + {"doc":{"dense_field": "updated text"}} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 1 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: "updated text" }