From fa8905b38f3c069c08fea26095e2d7fb88aaee78 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Mon, 18 Nov 2024 15:30:58 +0000 Subject: [PATCH] Fix handling of bulk requests with semantic text fields and delete ops (#116942) Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one. This PR resolves the discrepancy and includes additional tests to ensure proper behavior. --- docs/changelog/116942.yaml | 5 ++ .../ShardBulkInferenceActionFilterIT.java | 23 +++++--- .../xpack/inference/InferenceFeatures.java | 6 ++- .../ShardBulkInferenceActionFilter.java | 6 +-- .../mapper/SemanticTextFieldMapper.java | 1 + .../inference/30_semantic_text_inference.yml | 52 +++++++++++++++++++ 6 files changed, 83 insertions(+), 10 deletions(-) create mode 100644 docs/changelog/116942.yaml diff --git a/docs/changelog/116942.yaml b/docs/changelog/116942.yaml new file mode 100644 index 0000000000000..5037e8c59cd85 --- /dev/null +++ b/docs/changelog/116942.yaml @@ -0,0 +1,5 @@ +pr: 116942 +summary: Fix handling of bulk requests with semantic text fields and delete ops +area: Relevance +type: bug +issues: [] diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java index 8da1aaabd517a..eff1a0111414a 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -30,8 +31,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -86,30 +89,38 @@ public void testBulkOperations() throws Exception { int totalBulkReqs = randomIntBetween(2, 100); long totalDocs = 0; + Set ids = new HashSet<>(); for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) { BulkRequestBuilder bulkReqBuilder = client().prepareBulk(); int totalBulkSize = randomIntBetween(1, 100); for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) { - String id = Long.toString(totalDocs); + if (ids.size() > 0 && rarely(random())) { + String id = randomFrom(ids); + ids.remove(id); + DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id); + bulkReqBuilder.add(request); + continue; + } + String id = Long.toString(totalDocs++); boolean isIndexRequest = randomBoolean(); Map source = new HashMap<>(); source.put("sparse_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000)); source.put("dense_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000)); if (isIndexRequest) { bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source)); - totalDocs++; + ids.add(id); } else { boolean isUpsert = randomBoolean(); UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source); - if (isUpsert || totalDocs == 0) { + if (isUpsert || ids.size() == 0) { request.setDocAsUpsert(true); - totalDocs++; } else { // Update already existing document - id = Long.toString(randomLongBetween(0, totalDocs - 1)); + id = randomFrom(ids); } request.setId(id); bulkReqBuilder.add(request); + ids.add(id); } } BulkResponse bulkResponse = bulkReqBuilder.get(); @@ -134,7 +145,7 @@ public void testBulkOperations() throws Exception { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true); SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get(); try { - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size())); } finally { searchResponse.decRef(); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java index 13ce090f69fc6..3a27fbc27a955 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java @@ -26,6 +26,10 @@ public Set getFeatures() { @Override public Set getTestFeatures() { - return Set.of(SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX); + return Set.of( + SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, + SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX, + SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX + ); } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index f5ff04ab7c15f..50c1f1db107ab 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -412,8 +412,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons */ private Map> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) { Map> fieldRequestsMap = new LinkedHashMap<>(); - int itemIndex = 0; - for (var item : bulkShardRequest.items()) { + for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) { + var item = bulkShardRequest.items()[itemIndex]; if (item.getPrimaryResponse() != null) { // item was already aborted/processed by a filter in the chain upstream (e.g. security) continue; @@ -440,6 +440,7 @@ private Map> createFieldInferenceRequests(Bu // ignore delete request continue; } + final Map docMap = indexRequest.sourceAsMap(); for (var entry : fieldInferenceMap.values()) { String field = entry.getName(); @@ -481,7 +482,6 @@ private Map> createFieldInferenceRequests(Bu } } } - itemIndex++; } return fieldRequestsMap; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java index 1fff6ffaad025..8dd5145fbf8dd 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java @@ -83,6 +83,7 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFieldMapper { public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix"); public static final NodeFeature SEMANTIC_TEXT_ZERO_SIZE_FIX = new NodeFeature("semantic_text.zero_size_fix"); + public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix"); public static final String CONTENT_TYPE = "semantic_text"; diff --git a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml index ae32ed1e4dcb9..e2f29d4ebd628 100644 --- a/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml +++ b/x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml @@ -502,3 +502,55 @@ setup: - match: { _source.level_1.dense_field.text: "another inference test" } - exists: _source.level_1.dense_field.inference.chunks.0.embeddings - match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" } + +--- +"Deletes on bulk operation": + - requires: + cluster_features: semantic_text.delete_fix + reason: Delete operations are properly applied when subsequent operations include a semantic text field. + + - do: + bulk: + index: test-index + refresh: true + body: | + {"index":{"_id": "1"}} + {"dense_field": ["you know, for testing", "now with chunks"]} + {"index":{"_id": "2"}} + {"dense_field": ["some more tests", "that include chunks"]} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 2 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] } + - match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] } + + - do: + bulk: + index: test-index + refresh: true + body: | + {"delete":{ "_id": "2"}} + {"update":{"_id": "1"}} + {"doc":{"dense_field": "updated text"}} + + - do: + search: + index: test-index + body: + query: + semantic: + field: dense_field + query: "you know, for testing" + + - match: { hits.total.value: 1 } + - match: { hits.total.relation: eq } + - match: { hits.hits.0._source.dense_field.text: "updated text" }