Skip to content

Commit

Permalink
Fix handling of bulk requests with semantic text fields and delete ops (
Browse files Browse the repository at this point in the history
elastic#116942)

Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one.

This PR resolves the discrepancy and includes additional tests to ensure proper behavior.
  • Loading branch information
jimczi committed Nov 18, 2024
1 parent e4f4c95 commit 93f677f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116942.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116942
summary: Fix handling of bulk requests with semantic text fields and delete ops
area: Relevance
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -30,8 +31,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -87,30 +90,38 @@ public void testBulkOperations() throws Exception {

int totalBulkReqs = randomIntBetween(2, 100);
long totalDocs = 0;
Set<String> ids = new HashSet<>();
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
int totalBulkSize = randomIntBetween(1, 100);
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
String id = Long.toString(totalDocs);
if (ids.size() > 0 && rarely(random())) {
String id = randomFrom(ids);
ids.remove(id);
DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
bulkReqBuilder.add(request);
continue;
}
String id = Long.toString(totalDocs++);
boolean isIndexRequest = randomBoolean();
Map<String, Object> source = new HashMap<>();
source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
if (isIndexRequest) {
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
totalDocs++;
ids.add(id);
} else {
boolean isUpsert = randomBoolean();
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
if (isUpsert || totalDocs == 0) {
if (isUpsert || ids.size() == 0) {
request.setDocAsUpsert(true);
totalDocs++;
} else {
// Update already existing document
id = Long.toString(randomLongBetween(0, totalDocs - 1));
id = randomFrom(ids);
}
request.setId(id);
bulkReqBuilder.add(request);
ids.add(id);
}
}
BulkResponse bulkResponse = bulkReqBuilder.get();
Expand All @@ -135,7 +146,7 @@ public void testBulkOperations() throws Exception {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs));
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) ids.size()));
} finally {
searchResponse.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public Set<NodeFeature> getFeatures() {
public Set<NodeFeature> getTestFeatures() {
return Set.of(
SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
*/
private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
int itemIndex = 0;
for (var item : bulkShardRequest.items()) {
for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
var item = bulkShardRequest.items()[itemIndex];
if (item.getPrimaryResponse() != null) {
// item was already aborted/processed by a filter in the chain upstream (e.g. security)
continue;
Expand All @@ -441,6 +441,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
// ignore delete request
continue;
}

final Map<String, Object> docMap = indexRequest.sourceAsMap();
for (var entry : fieldInferenceMap.values()) {
String field = entry.getName();
Expand Down Expand Up @@ -483,7 +484,6 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
}
}
}
itemIndex++;
}
return fieldRequestsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie
public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id");
public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2");
public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");

public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix");
public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");

public static final String CONTENT_TYPE = "semantic_text";
public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,55 @@ setup:
- match: { _source.level_1.dense_field.text: "another inference test" }
- exists: _source.level_1.dense_field.inference.chunks.0.embeddings
- match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }

---
"Deletes on bulk operation":
- requires:
cluster_features: semantic_text.delete_fix
reason: Delete operations are properly applied when subsequent operations include a semantic text field.

- do:
bulk:
index: test-index
refresh: true
body: |
{"index":{"_id": "1"}}
{"dense_field": ["you know, for testing", "now with chunks"]}
{"index":{"_id": "2"}}
{"dense_field": ["some more tests", "that include chunks"]}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 2 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
- match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }

- do:
bulk:
index: test-index
refresh: true
body: |
{"delete":{ "_id": "2"}}
{"update":{"_id": "1"}}
{"doc":{"dense_field": "updated text"}}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 1 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: "updated text" }

0 comments on commit 93f677f

Please sign in to comment.