Skip to content

Commit

Permalink
Fix handling of bulk requests with semantic text fields and delete op…
Browse files Browse the repository at this point in the history
…erations

Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one.

This PR resolves the discrepancy and includes additional tests to ensure proper behavior.
  • Loading branch information
jimczi committed Nov 18, 2024
1 parent 4e17c61 commit 332b79a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -30,8 +31,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -87,30 +90,38 @@ public void testBulkOperations() throws Exception {

int totalBulkReqs = randomIntBetween(2, 100);
long totalDocs = 0;
Set<String> ids = new HashSet<>();
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
int totalBulkSize = randomIntBetween(1, 100);
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
String id = Long.toString(totalDocs);
if (ids.size() > 0 && rarely(random())) {
String id = randomFrom(ids);
ids.remove(id);
DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
bulkReqBuilder.add(request);
continue;
}
String id = Long.toString(totalDocs++);
boolean isIndexRequest = randomBoolean();
Map<String, Object> source = new HashMap<>();
source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
if (isIndexRequest) {
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
totalDocs++;
ids.add(id);
} else {
boolean isUpsert = randomBoolean();
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
if (isUpsert || totalDocs == 0) {
if (isUpsert || ids.size() == 0) {
request.setDocAsUpsert(true);
totalDocs++;
} else {
// Update already existing document
id = Long.toString(randomLongBetween(0, totalDocs - 1));
id = randomFrom(ids);
}
request.setId(id);
bulkReqBuilder.add(request);
ids.add(id);
}
}
BulkResponse bulkResponse = bulkReqBuilder.get();
Expand All @@ -135,7 +146,7 @@ public void testBulkOperations() throws Exception {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(totalDocs));
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) ids.size()));
} finally {
searchResponse.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public Set<NodeFeature> getFeatures() {
public Set<NodeFeature> getTestFeatures() {
return Set.of(
SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
*/
private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
int itemIndex = 0;
for (var item : bulkShardRequest.items()) {
for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
var item = bulkShardRequest.items()[itemIndex];
if (item.getPrimaryResponse() != null) {
// item was already aborted/processed by a filter in the chain upstream (e.g. security)
continue;
Expand All @@ -441,6 +441,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
// ignore delete request
continue;
}

final Map<String, Object> docMap = indexRequest.sourceAsMap();
for (var entry : fieldInferenceMap.values()) {
String field = entry.getName();
Expand Down Expand Up @@ -483,7 +484,6 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
}
}
}
itemIndex++;
}
return fieldRequestsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie
public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id");
public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2");
public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");

public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix");
public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");

public static final String CONTENT_TYPE = "semantic_text";
public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,55 @@ setup:
- match: { _source.level_1.dense_field.text: "another inference test" }
- exists: _source.level_1.dense_field.inference.chunks.0.embeddings
- match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }

---
"Deletes on bulk operation":
- requires:
cluster_features: semantic_text.delete_fix
reason: Delete operations are properly applied when subsequent operations include a semantic text field.

- do:
bulk:
index: test-index
refresh: true
body: |
{"index":{"_id": "1"}}
{"dense_field": ["you know, for testing", "now with chunks"]}
{"index":{"_id": "2"}}
{"dense_field": ["some more tests", "that include chunks"]}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 2 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
- match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }

- do:
bulk:
index: test-index
refresh: true
body: |
{"delete":{ "_id": "2"}}
{"update":{"_id": "1"}}
{"doc":{"dense_field": "updated text"}}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 1 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: "updated text" }

0 comments on commit 332b79a

Please sign in to comment.